1 /**
2 Implementation of [Graylog Extended Logging Format](http://docs.graylog.org/en/latest/pages/gelf.html) for `std.experimental.logger`.
3 Generated documents at http://gelf.code.kaleidic.io
4 
5 ##  Features
6 - Small and flexible API
7 - `GrayLogger` does not throw exceptions while it is sending a message
8 - UDP, TCP, and HTTP transports are supproted
9 */
10 
11 /**
12 Example:
13 	---
14 		// UDP
15 		Compress c; // `null` value is for no compression
16 		// c = new Compress;
17 		// c = new Compress(HeaderFormat.gzip);
18 		auto socket = new UdpSocket();
19 		socket.connect(new InternetAddress("192.168.59.103", 12201));
20 		// The last param is UDP chunk size. This is optional paramter with default value equals to 8192
21 		sharedLog = new UdpGrayLogger(socket, c, "YourServiceName", LogLevel.all, 4096);
22 
23 		error("===== Error Information =====");
24 	---
25 */
26 
27 /**
28 Example:
29 	---
30 		// TCP
31 		import std.typecons: Yes, No;
32 		auto socket = new TcpSocket();
33 		socket.connect(new InternetAddress("192.168.59.103", 12201));
34 		// Defualt value for nullDelimeter is `Yes`.
35 		// Newline delimiter would be used if nullDelimeter is `false`/`No`.
36 		sharedLog = new TcpGrayLogger(socket, "YourServiceName", LogLevel.all, Yes.nullDelimeter);
37 
38 		error("===== Error Information =====");
39 */
40 
41 /**
42 Example:
43 	---
44 		// HTTP
45 		import std.net.curl: HTTP;
46 		Compress c; // `null` value is for no compression
47 		// c = new Compress;
48 		// c = new Compress(HeaderFormat.gzip);
49 		sharedLog = new HttpGrayLogger(HTTP("192.168.59.103:12201/gelf"), c, "YourServiceName", LogLevel.all);
50 
51 		error("===== Error Information =====");
52 	---
53 */
54 
55 module gelf;
56 
57 //version = gelf_test_udp;
58 //version = gelf_test_tcp;
59 //version = gelf_test_http;
60 
61 /// UDP
62 version(gelf_test_udp)
63 ///
64 unittest
65 {
66 	import std.format: format;
67 	foreach(i, c; [Compress.init, new Compress, new Compress(HeaderFormat.gzip)])
68 	{
69 		auto socket = new UdpSocket();
70 		socket.connect(new InternetAddress("192.168.59.103", 12201));
71 		auto logger = new UdpGrayLogger(socket, null, "UDP%s".format(i), LogLevel.all, 512);
72 		logger.errorf("===== UDP #%s.0 =====", i);
73 		logger.errorf("===== UDP #%s.1 =====", i);
74 		logger.errorf("========== UDP #%s.3 ==========\n%s", i,
75 			"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed vitae nisl scelerisque,
76 			vestibulum arcu quis, rhoncus leo. Nunc ullamcorper nibh vitae nisl viverra dignissim.
77 			Etiam dictum tincidunt commodo. Morbi faucibus et ipsum in hendrerit. Phasellus rutrum,
78 			lacus at auctor tempor, metus nisl suscipit nisi, elementum molestie quam enim nec erat.
79 			Sed cursus libero felis, in pulvinar neque molestie eget. Praesent pulvinar est vitae sem
80 			pulvinar, pharetra dignissim velit condimentum.
81 
82 			Vestibulum laoreet lorem eu dui ornare, ac congue enim consectetur.
83 			Morbi tincidunt, turpis et egestas sodales, erat velit suscipit felis,
84 			quis porttitor nulla turpis ut odio. Fusce in faucibus felis, ac feugiat mauris.
85 			Nullam vel sagittis mi. Nullam eu turpis ullamcorper, porta odio sit amet, dictum lorem.
86 			Nunc dictum in sem vel pharetra. In consectetur posuere massa, sed convallis felis tempus quis.
87 			Maecenas eleifend aliquam lectus pretium aliquam. Morbi viverra dui tortor,
88 			vel laoreet libero accumsan sed. Quisque congue erat quis nisl sed.");
89 	}
90 }
91 
92 /// TCP
93 version(gelf_test_tcp)
94 ///
95 unittest
96 {
97 	auto socket = new TcpSocket();
98 	socket.connect(new InternetAddress("192.168.59.103", 12202));
99 	auto logger = new TcpGrayLogger(socket, "TCP", LogLevel.all);
100 	logger.error("===== TCP.0 =====");
101 	logger.error("===== TCP.1 =====");
102 	logger.error("===== TCP.2 =====");
103 }
104 
105 /// HTTP
106 version(gelf_test_http)
107 ///
108 unittest
109 {
110 	import std.format: format;
111 	import std.net.curl;
112 	foreach(i, c; [Compress.init, new Compress, new Compress(HeaderFormat.gzip)])
113 	{
114 		auto logger = new HttpGrayLogger(HTTP("192.168.59.103:12204/gelf"), c, "HTTP%s".format(i), LogLevel.all);
115 		logger.errorf("===== HTTP #%s =====", i);
116 	}
117 }
118 
119 ///
120 unittest
121 {
122 	void t_udp()
123 	{
124 		Compress c; //`null` value for no compression
125 		// c = new Compress;
126 		// c = new Compress(HeaderFormat.gzip);
127 		auto socket = new UdpSocket();
128 		socket.connect(new InternetAddress("192.168.59.103", 12201));
129 		// The last param is UDP chunk size. This is optional paramter with default value equals to 8192
130 		sharedLog = new UdpGrayLogger(socket, c, "YourServiceName", LogLevel.all, 4096);
131 		error("===== Error Information =====");
132 	}
133 
134 	void t_tcp()
135 	{
136 		import std.typecons: Yes, No;
137 		auto socket = new TcpSocket();
138 		socket.connect(new InternetAddress("192.168.59.103", 12201));
139 		/+Defualt value for nullDelimeter is `Yes`. Newline delimiter would be used if nullDelimeter is `false`/`No`.+/
140 		sharedLog = new TcpGrayLogger(socket, "YourServiceName", LogLevel.all, Yes.nullDelimeter);
141 		error("===== Error Information =====");
142 	}
143 
144 	void t_http()
145 	{
146 		import std.net.curl: HTTP;
147 		Compress c; //`null` value for no compression
148 		// c = new Compress;
149 		// c = new Compress(HeaderFormat.gzip);
150 		sharedLog = new HttpGrayLogger(HTTP("192.168.59.103:12204/gelf"), c, "YourServiceName", LogLevel.all);
151 		error("===== Error Information =====");
152 	}
153 }
154 
155 public import std.experimental.logger.core;
156 
157 import std.socket;
158 import std.format : formattedWrite;
159 import std.datetime : Date, DateTime, SysTime, UTC;
160 import std.concurrency : Tid;
161 import std.zlib: Compress, HeaderFormat;
162 import core.stdc.errno: errno, EINTR;
163 
164 /**
165 HTTP Graylog Logger
166 */
167 class HttpGrayLogger : GrayLogger
168 {
169 	import std.net.curl;
170 
171 	protected HTTP _http;
172 
173 	/++
174 	Params:
175 		http = HTTP configuration. See `std.net.curl`.
176 		compress = compress algorithm. Sets `null` or `Compress.init` to send messages without compression.
177 		host = local service name
178 		v = log level
179 		chunk = maximal chunk size (size of UDP datagram)
180 	+/
181 	this(HTTP http, Compress compress, string host, LogLevel v) @trusted
182 	{
183 		_http = http;
184 		super(compress, host, v);
185 	}
186 
187 	override protected void writeLogMsg(ref LogEntry payload) @trusted
188 	{
189 		fillAppender(payload);
190 		auto msg = _dataAppender.data;
191 		scope(exit) clearAppender;
192 		http.contentLength = msg.length;
193 		http.onSend =
194 			(void[] data)
195 			{
196 				import std.algorithm.comparison: min;
197 				immutable len = min(data.length, msg.length);
198 				if (len)
199 				{
200 					data[0..len] = msg[0..len];
201 					msg = msg[len .. $];
202 				}
203 				return len;
204 			};
205 		import std.typecons: No;
206 		http.perform(No.throwOnError);
207 	}
208 
209 	final HTTP http() @property nothrow
210 	{
211 		return _http;
212 	}
213 }
214 
215 /**
216 TCP Graylog Logger
217 */
218 class TcpGrayLogger : SocketGrayLogger
219 {
220 	import std.typecons: Flag, Yes;
221 
222 	private immutable string delim;
223 
224 	/**
225 	Graylog TCP connection does not support compression.
226 	Params:
227 		socket = remote blocking TCP socket
228 		host = local service name
229 		v = log level
230 		useNull = Use null byte as frame delimiter? Otherwise newline delimiter is used.
231 	*/
232 	this(TcpSocket socket, string host, LogLevel v, Flag!"nullDelimeter" useNull = Yes.nullDelimeter) @safe
233 	{
234 		delim = useNull ? "\0" : "\n";
235 		super(socket, null, host, v);
236 	}
237 
238 	///
239 	override protected void writeLogMsg(ref LogEntry payload) @trusted
240 	{
241 		if(!socket.isAlive)
242 		{
243  			// The socket is dead.
244 			// Do nothing
245 			return;
246 		}
247 		fillAppender(payload);
248 		scope(exit) clearAppender;
249 		_dataAppender.put(cast(ubyte[])delim);
250 		auto data = _dataAppender.data;
251 		do
252 		{
253 			immutable status = socket.send(data);
254 			if(status == Socket.ERROR)
255 			{
256 				if(errno == EINTR)
257 				{
258 					// Probably the GC interupted the process.
259 					// Try again
260 					continue;
261 				}
262 				// Failed to send data
263 				// Do nothing
264 				break;
265 			}
266 			data = data[status..$];
267 		}
268 		while(data.length);
269 	}
270 }
271 
272 /**
273 UDP Graylog Logger
274 */
275 class UdpGrayLogger : SocketGrayLogger
276 {
277 	protected ubyte[] _chunk;
278 	import std.random;
279 	import std.datetime;
280 
281 	Mt19937 gen;
282 
283 	/**
284 	Params:
285 		socket = remote blocking UDP socket
286 		compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression.
287 		host = local service name
288 		v = log level
289 		chunk = maximal chunk size (size of UDP datagram)
290 	*/
291 	this(UdpSocket socket, Compress compress, string host, LogLevel v, int chunk = 8192) @safe
292 	{
293 		super(socket, compress, host, v);
294 		if(chunk < 512)
295 			throw new Exception("chunk must be greater or equal to 512");
296 		_chunk = new ubyte[chunk];
297 		_chunk[0] = 0x1e;
298 		_chunk[1] = 0x0f;
299 		gen = typeof(gen)(cast(uint)(MonoTime.currTime.ticks ^ (uniform!size_t * hashOf(host))));
300 	}
301 
302 	protected bool send(const(void)[] data) @safe
303 	{
304 		for(;;)
305 		{
306 			immutable status = socket.send(data);
307 			if(status == Socket.ERROR)
308 			{
309 				if(errno == EINTR)
310 				{
311 					// Probably the GC interupted the process.
312 					// Try again
313 					continue;
314 				}
315 				// Failed to send the datagram.
316 				// Do nothing
317 				return false;
318 			}
319 			return status == data.length;
320 		}
321 	}
322 
323 	override protected void writeLogMsg(ref LogEntry payload) @trusted
324 	{
325 		if(!socket.isAlive)
326 		{
327 			// The socket is dead.
328 			// Do nothing
329 			return;
330 		}
331 		fillAppender(payload);
332 		auto data = _dataAppender.data;
333 		scope(exit) clearAppender;
334 		if(data.length <= _chunk.length)
335 		{
336 			// send all data as single datagram
337 			send(data);
338 		}
339 		else
340 		{
341 			// send all data by chunks
342 			import std.range: chunks, enumerate;
343 			auto chs = (cast(ubyte[])data).chunks(_chunk.length - 12);
344 			immutable len = chs.length;
345 			if(len > 128)
346 			{
347 				// ignore huge msg
348 				return;
349 			}
350 			ulong[1] id = void;
351 			id[0] = uniform!ulong(gen);
352 			_chunk[2..10] = cast(ubyte[]) id;
353 			_chunk[11] = cast(ubyte) len;
354 			foreach(i, ch; chs.enumerate)
355 			{
356 				//Endianness does not matter
357 				_chunk[10] = cast(ubyte) i;
358 				immutable datagramLength = 12 + ch.length;
359 				_chunk[12 .. datagramLength] = ch[];
360 				immutable success = send(_chunk[0 .. datagramLength]);
361 				if(!success)
362 				{
363 					return;
364 				}
365 			}
366 		}
367 	}
368 }
369 
370 /**
371 Abstract Socket Graylog Logger
372 */
373 abstract class SocketGrayLogger : GrayLogger
374 {
375 	protected Socket _socket;
376 
377 	/**
378 	Params:
379 		socket = remote blocking socket
380 		compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression.
381 		host = local service name
382 		v = log level
383 	*/
384 	this(Socket socket, Compress compress, string host, LogLevel v) @safe
385 	{
386 		if(!socket.blocking)
387 			throw new SocketException("SocketGrayLogger: socket must be blocking.");
388 		_socket = socket;
389 		super(compress, host, v);
390 	}
391 
392 	///
393 	final Socket socket() @property @safe pure nothrow @nogc
394 	{
395 		return _socket;
396 	}
397 }
398 
399 /**
400 Abstract Graylog Logger
401 */
402 abstract class GrayLogger : Logger
403 {
404 	enum string gelfVersion = "1.1";
405 	import std.array: appender, Appender;
406 
407 	protected string _host;
408 	protected Compress _compress;
409 	protected immutable string _msgStart;
410 	protected Appender!(ubyte[]) _dataAppender;
411 
412 	/**
413 	Params:
414 		compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression.
415 		host = local service name
416 		v = log level
417 	*/
418 	this(Compress compress, string host, LogLevel v) @safe
419 	{
420 		_dataAppender = appender!(ubyte[]);
421 		_host = host;
422 		_compress = compress;
423 		_msgStart = `{"version":"` ~ gelfVersion ~ `","host":"` ~ host ~ `","short_message":`;
424 		super(v);
425 	}
426 
427 	protected void formatMessage(scope void delegate(const(char)[]) sink, ref LogEntry payload) @trusted
428 	{
429 		import std.format;
430 		FormatSpec!char fmt;
431 
432 		sink(_msgStart);
433 		sink.formatElement(payload.msg, fmt);
434 
435 		sink(`,"timestamp":`);
436 		//auto time = payload.timestamp;
437 		auto time = payload.timestamp.toUTC;
438 		sink.formatValue(time.toUnixTime, fmt);
439 		if(immutable msc = time.fracSecs.total!"msecs")
440 		{
441 			sink(".");
442 			sink.formatValue(msc, fmt);
443 		}
444 
445 		sink(`,"level":`);
446 		uint level = void;
447 		final switch(payload.logLevel) with(LogLevel)
448 		{
449 			case all:     level = 7; break; // Debug: debug-level messages
450 			case trace:   level = 6; break; // Informational: informational messages
451 			case info:    level = 5; break; // Notice: normal but significant condition
452 			case warning: level = 4; break; // Warning: warning conditions
453 			case error:   level = 3; break; // Error: error conditions
454 			case critical:level = 2; break; // Critical: critical conditions
455 			case fatal:   level = 1; break; // Alert: action must be taken immediately
456 			case off:     level = 0; break; // Emergency: system is unusable
457 		}
458 		sink.formatValue(level, fmt);
459 
460 		sink(`,"line":`);
461 		sink.formatValue(payload.line, fmt);
462 		sink(`,"file":"`);
463 		sink.formatValue(payload.file, fmt);
464 		sink(`","_func_name":"`);
465 		sink(payload.funcName);
466 		sink(`","_pretty_func_name":"`);
467 		sink(payload.prettyFuncName);
468 		sink(`","_module_name":"`);
469 		sink(payload.moduleName);
470 		sink(`"}`);
471 	}
472 
473 	///
474 	final void fillAppender(ref LogEntry payload) @trusted
475 	{
476 		if(_compress)
477 		{
478 			formatMessage( (str) { _dataAppender.put(cast(const(ubyte)[]) _compress.compress(str)); }, payload);
479 			_dataAppender.put(cast(const(ubyte)[]) _compress.flush);
480 		}
481 		else
482 		{
483 			formatMessage( (str) { _dataAppender.put(cast(const(ubyte)[]) str); }, payload);
484 		}
485 	}
486 
487 	///
488 	final void clearAppender()
489 	{
490 		enum ml = 8192;
491 		_dataAppender.clear;
492 		if(_dataAppender.capacity > ml)
493 		{
494 			_dataAppender.shrinkTo(ml);
495 		}
496 	}
497 
498 	///
499 	final string host() @property @safe pure nothrow @nogc
500 	{
501 		return _host;
502 	}
503 }