1 /++
2 Implementation of Graylog Extended Logging Format for `std.experimental.logger`.
3 +/
4 module gelf;
5 
6 //version = gelf_test_udp;
7 //version = gelf_test_tcp;
8 //version = gelf_test_http;
9 
10 /// UDP
11 version(gelf_test_udp)
12 unittest
13 {
14 	import std.format: format;
15 	foreach(i, c; [Compress.init, new Compress, new Compress(HeaderFormat.gzip)])
16 	{
17 		auto socket = new UdpSocket();
18 		socket.connect(new InternetAddress("192.168.59.103", 12201));
19 		auto logger = new UdpGrayLogger(socket, null, "UDP%s".format(i), LogLevel.all, 512);
20 		logger.errorf("===== UDP #%s.0 =====", i);
21 		logger.errorf("===== UDP #%s.1 =====", i);
22 		logger.errorf("========== UDP #%s.3 ==========\n%s", i,
23 			"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed vitae nisl scelerisque,
24 			vestibulum arcu quis, rhoncus leo. Nunc ullamcorper nibh vitae nisl viverra dignissim.
25 			Etiam dictum tincidunt commodo. Morbi faucibus et ipsum in hendrerit. Phasellus rutrum,
26 			lacus at auctor tempor, metus nisl suscipit nisi, elementum molestie quam enim nec erat.
27 			Sed cursus libero felis, in pulvinar neque molestie eget. Praesent pulvinar est vitae sem
28 			pulvinar, pharetra dignissim velit condimentum.
29 
30 			Vestibulum laoreet lorem eu dui ornare, ac congue enim consectetur.
31 			Morbi tincidunt, turpis et egestas sodales, erat velit suscipit felis,
32 			quis porttitor nulla turpis ut odio. Fusce in faucibus felis, ac feugiat mauris.
33 			Nullam vel sagittis mi. Nullam eu turpis ullamcorper, porta odio sit amet, dictum lorem.
34 			Nunc dictum in sem vel pharetra. In consectetur posuere massa, sed convallis felis tempus quis.
35 			Maecenas eleifend aliquam lectus pretium aliquam. Morbi viverra dui tortor,
36 			vel laoreet libero accumsan sed. Quisque congue erat quis nisl sed.");
37 	}
38 }
39 
40 /// TCP
41 version(gelf_test_tcp)
42 unittest
43 {
44 	auto socket = new TcpSocket();
45 	socket.connect(new InternetAddress("192.168.59.103", 12202));
46 	auto logger = new TcpGrayLogger(socket, "TCP", LogLevel.all);
47 	logger.error("===== TCP.0 =====");
48 	logger.error("===== TCP.1 =====");
49 	logger.error("===== TCP.2 =====");
50 }
51 
52 /// HTTP
53 version(gelf_test_http)
54 unittest
55 {
56 	import std.format: format;
57 	import std.net.curl;
58 	foreach(i, c; [Compress.init, new Compress, new Compress(HeaderFormat.gzip)])
59 	{
60 		auto logger = new HttpGrayLogger(HTTP("192.168.59.103:12204/gelf"), c, "HTTP%s".format(i), LogLevel.all);
61 		logger.errorf("===== HTTP #%s =====", i);
62 	}
63 }
64 
65 unittest
66 {
67 	void t_udp()
68 	{
69 		Compress c; //`null` value for no compression
70 		// c = new Compress;
71 		// c = new Compress(HeaderFormat.gzip);
72 		auto socket = new UdpSocket();
73 		socket.connect(new InternetAddress("192.168.59.103", 12201));
74 		// The last param is UDP chunk size. This is optional paramter with default value equals to 8192
75 		sharedLog = new UdpGrayLogger(socket, c, "YourServiceName", LogLevel.all, 4096);
76 		error("===== Error Information =====");
77 	}
78 
79 	void t_tcp()
80 	{
81 		import std.typecons: Yes, No;
82 		auto socket = new TcpSocket();
83 		socket.connect(new InternetAddress("192.168.59.103", 12201));
84 		/+Defualt value for nullDelimeter is `Yes`. Newline delimiter would be used if nullDelimeter is `false`/`No`.+/
85 		sharedLog = new TcpGrayLogger(socket, "YourServiceName", LogLevel.all, Yes.nullDelimeter);
86 		error("===== Error Information =====");
87 	}
88 
89 	void t_http()
90 	{
91 		import std.net.curl: HTTP;
92 		Compress c; //`null` value for no compression
93 		// c = new Compress;
94 		// c = new Compress(HeaderFormat.gzip);
95 		sharedLog = new HttpGrayLogger(HTTP("192.168.59.103:12204/gelf"), c, "YourServiceName", LogLevel.all);
96 		error("===== Error Information =====");
97 	}
98 }
99 
100 public import std.experimental.logger.core;
101 
102 import std.socket;
103 import std.format : formattedWrite;
104 import std.datetime : Date, DateTime, SysTime, UTC;
105 import std.concurrency : Tid;
106 import std.zlib: Compress, HeaderFormat;
107 import core.stdc.errno: errno, EINTR;
108 
109 /++
110 HTTP Graylog Logger
111 +/
112 class HttpGrayLogger : GrayLogger
113 {
114 	import std.net.curl;
115 
116 	protected HTTP _http;
117 
118 	/++
119 	Params:
120 		http = HTTP configuration. See `std.net.curl`.
121 		compress = compress algorithm. Sets `null` or `Compress.init` to send messages without compression.
122 		host = local service name
123 		v = log level
124 		chunk = maximal chunk size (size of UDP datagram)
125 	+/
126 	this(HTTP http, Compress compress, string host, LogLevel v) @trusted
127 	{
128 		_http = http;
129 		super(compress, host, v);
130 	}
131 
132 	override protected void writeLogMsg(ref LogEntry payload) @trusted
133 	{
134 		fillAppender(payload);
135 		auto msg = _dataAppender.data;
136 		scope(exit) clearAppender;
137 		http.contentLength = msg.length;
138 		http.onSend =
139 			(void[] data)
140 			{
141 				import std.algorithm.comparison: min;
142 				immutable len = min(data.length, msg.length);
143 				if (len)
144 				{
145 					data[0..len] = msg[0..len];
146 					msg = msg[len .. $];
147 				}
148 				return len;
149 			};
150 		import std.typecons: No;
151 		http.perform(No.throwOnError);
152 	}
153 
154 	final HTTP http() @property nothrow
155 	{
156 		return _http;
157 	}
158 }
159 
160 /++
161 TCP Graylog Logger
162 +/
163 class TcpGrayLogger : SocketGrayLogger
164 {
165 	import std.typecons: Flag, Yes;
166 
167 	private immutable string delim;
168 
169 	/++
170 	Graylog TCP connection does not support compression.
171 	Params:
172 		socket = remote blocking TCP socket
173 		host = local service name
174 		v = log level
175 		useNull = Use null byte as frame delimiter? Otherwise newline delimiter is used.
176 	+/
177 	this(TcpSocket socket, string host, LogLevel v, Flag!"nullDelimeter" useNull = Yes.nullDelimeter) @safe
178 	{
179 		delim = useNull ? "\0" : "\n";
180 		super(socket, null, host, v);
181 	}
182 
183 	override protected void writeLogMsg(ref LogEntry payload) @trusted
184 	{
185 		if(!socket.isAlive)
186 		{
187  			// The socket is dead.
188 			// Do nothing
189 			return;
190 		}
191 		fillAppender(payload);
192 		scope(exit) clearAppender;
193 		_dataAppender.put(cast(ubyte[])delim);
194 		auto data = _dataAppender.data;
195 		do
196 		{
197 			immutable status = socket.send(data);
198 			if(status == Socket.ERROR)
199 			{
200 				if(errno == EINTR)
201 				{
202 					// Probably the GC interupted the process.
203 					// Try again
204 					continue;
205 				}
206 				// Failed to send data
207 				// Do nothing
208 				break;
209 			}
210 			data = data[status..$];
211 		}
212 		while(data.length);
213 	}
214 }
215 
216 /++
217 UDP Graylog Logger
218 +/
219 class UdpGrayLogger : SocketGrayLogger
220 {
221 	protected ubyte[] _chunk;
222 	import std.random;
223 	import std.datetime;
224 
225 	Mt19937 gen;
226 
227 	/++
228 	Params:
229 		socket = remote blocking UDP socket
230 		compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression.
231 		host = local service name
232 		v = log level
233 		chunk = maximal chunk size (size of UDP datagram)
234 	+/
235 	this(UdpSocket socket, Compress compress, string host, LogLevel v, int chunk = 8192) @safe
236 	{
237 		super(socket, compress, host, v);
238 		if(chunk < 512)
239 			throw new Exception("chunk must be greater or equal to 512");
240 		_chunk = new ubyte[chunk];
241 		_chunk[0] = 0x1e;
242 		_chunk[1] = 0x0f;
243 		gen = typeof(gen)(cast(uint)(MonoTime.currTime.ticks ^ (uniform!size_t * hashOf(host))));
244 	}
245 
246 	protected bool send(const(void)[] data) @safe
247 	{
248 		for(;;)
249 		{
250 			immutable status = socket.send(data);
251 			if(status == Socket.ERROR)
252 			{
253 				if(errno == EINTR)
254 				{
255 					// Probably the GC interupted the process.
256 					// Try again
257 					continue;
258 				}
259 				// Failed to send the datagram.
260 				// Do nothing
261 				return false;
262 			}
263 			return status == data.length;
264 		}
265 	}
266 
267 	override protected void writeLogMsg(ref LogEntry payload) @trusted
268 	{
269 		if(!socket.isAlive)
270 		{
271 			// The socket is dead.
272 			// Do nothing
273 			return;
274 		}
275 		fillAppender(payload);
276 		auto data = _dataAppender.data;
277 		scope(exit) clearAppender;
278 		if(data.length <= _chunk.length)
279 		{
280 			// send all data as single datagram
281 			send(data);
282 		}
283 		else
284 		{
285 			// send all data by chunks
286 			import std.range: chunks, enumerate;
287 			auto chs = (cast(ubyte[])data).chunks(_chunk.length - 12);
288 			immutable len = chs.length;
289 			if(len > 128)
290 			{
291 				// ignore huge msg
292 				return;
293 			}
294 			ulong[1] id = void;
295 			id[0] = uniform!ulong(gen);
296 			_chunk[2..10] = cast(ubyte[]) id;
297 			_chunk[11] = cast(ubyte) len;
298 			foreach(i, ch; chs.enumerate)
299 			{
300 				//Endianness does not matter
301 				_chunk[10] = cast(ubyte) i;
302 				immutable datagramLength = 12 + ch.length;
303 				_chunk[12 .. datagramLength] = ch[];
304 				immutable success = send(_chunk[0 .. datagramLength]);
305 				if(!success)
306 				{
307 					return;
308 				}
309 			}
310 		}
311 	}
312 }
313 
314 /++
315 Abstract Socket Graylog Logger
316 +/
317 abstract class SocketGrayLogger : GrayLogger
318 {
319 	protected Socket _socket;
320 
321 	/++
322 	Params:
323 		socket = remote blocking socket
324 		compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression.
325 		host = local service name
326 		v = log level
327 	+/
328 	this(Socket socket, Compress compress, string host, LogLevel v) @safe
329 	{
330 		if(!socket.blocking)
331 			throw new SocketException("SocketGrayLogger: socket must be blocking.");
332 		_socket = socket;
333 		super(compress, host, v);
334 	}
335 
336 	final Socket socket() @property @safe pure nothrow @nogc
337 	{
338 		return _socket;
339 	}
340 }
341 
342 /++
343 Abstract Graylog Logger
344 +/
345 abstract class GrayLogger : Logger
346 {
347 	enum string gelfVersion = "1.1";
348 	import std.array: appender, Appender;
349 
350 	protected string _host;
351 	protected Compress _compress;
352 	protected immutable string _msgStart;
353 	protected Appender!(ubyte[]) _dataAppender;
354 
355 	/++
356 	Params:
357 		compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression.
358 		host = local service name
359 		v = log level
360 	+/
361 	this(Compress compress, string host, LogLevel v) @safe
362 	{
363 		_dataAppender = appender!(ubyte[]);
364 		_host = host;
365 		_compress = compress;
366 		_msgStart = `{"version":"` ~ gelfVersion ~ `","host":"` ~ host ~ `","short_message":`;
367 		super(v);
368 	}
369 
370 	protected void formatMessage(scope void delegate(const(char)[]) sink, ref LogEntry payload) @trusted
371 	{
372 		import std.format;
373 		FormatSpec!char fmt;
374 
375 		sink(_msgStart);
376 		sink.formatElement(payload.msg, fmt);
377 
378 		sink(`,"timestamp":`);
379 		//auto time = payload.timestamp;
380 		auto time = payload.timestamp.toUTC;
381 		sink.formatValue(time.toUnixTime, fmt);
382 		if(immutable msc = time.fracSecs.total!"msecs")
383 		{
384 			sink(".");
385 			sink.formatValue(msc, fmt);
386 		}
387 
388 		sink(`,"level":`);
389 		uint level = void;
390 		final switch(payload.logLevel) with(LogLevel)
391 		{
392 			case all:     level = 7; break; // Debug: debug-level messages
393 			case trace:   level = 6; break; // Informational: informational messages
394 			case info:    level = 5; break; // Notice: normal but significant condition
395 			case warning: level = 4; break; // Warning: warning conditions
396 			case error:   level = 3; break; // Error: error conditions
397 			case critical:level = 2; break; // Critical: critical conditions
398 			case fatal:   level = 1; break; // Alert: action must be taken immediately
399 			case off:     level = 0; break; // Emergency: system is unusable
400 		}
401 		sink.formatValue(level, fmt);
402 
403 		sink(`,"line":`);
404 		sink.formatValue(payload.line, fmt);
405 		sink(`,"file":"`);
406 		sink.formatValue(payload.file, fmt);
407 		sink(`","_func_name":"`);
408 		sink(payload.funcName);
409 		sink(`","_pretty_func_name":"`);
410 		sink(payload.prettyFuncName);
411 		sink(`","_module_name":"`);
412 		sink(payload.moduleName);
413 		sink(`"}`);
414 	}
415 
416 	final void fillAppender(ref LogEntry payload) @trusted
417 	{
418 		if(_compress)
419 		{
420 			formatMessage( (str) { _dataAppender.put(cast(const(ubyte)[]) _compress.compress(str)); }, payload);
421 			_dataAppender.put(cast(const(ubyte)[]) _compress.flush);
422 		}
423 		else
424 		{
425 			formatMessage( (str) { _dataAppender.put(cast(const(ubyte)[]) str); }, payload);
426 		}
427 	}
428 
429 	final void clearAppender()
430 	{
431 		enum ml = 8192;
432 		_dataAppender.clear;
433 		if(_dataAppender.capacity > ml)
434 		{
435 			_dataAppender.shrinkTo(ml);
436 		}
437 	}
438 
439 	final string host() @property @safe pure nothrow @nogc
440 	{
441 		return _host;
442 	}
443 }