1 /** 2 Implementation of [Graylog Extended Logging Format](http://docs.graylog.org/en/latest/pages/gelf.html) for `std.experimental.logger`. 3 Generated documents at http://gelf.code.kaleidic.io 4 5 ## Features 6 - Small and flexible API 7 - `GrayLogger` does not throw exceptions while it is sending a message 8 - UDP, TCP, and HTTP transports are supproted 9 */ 10 11 /** 12 Example: 13 --- 14 // UDP 15 Compress c; // `null` value is for no compression 16 // c = new Compress; 17 // c = new Compress(HeaderFormat.gzip); 18 auto socket = new UdpSocket(); 19 socket.connect(new InternetAddress("192.168.59.103", 12201)); 20 // The last param is UDP chunk size. This is optional paramter with default value equals to 8192 21 sharedLog = new UdpGrayLogger(socket, c, "YourServiceName", LogLevel.all, 4096); 22 23 error("===== Error Information ====="); 24 --- 25 */ 26 27 /** 28 Example: 29 --- 30 // TCP 31 import std.typecons: Yes, No; 32 auto socket = new TcpSocket(); 33 socket.connect(new InternetAddress("192.168.59.103", 12201)); 34 // Defualt value for nullDelimeter is `Yes`. 35 // Newline delimiter would be used if nullDelimeter is `false`/`No`. 36 sharedLog = new TcpGrayLogger(socket, "YourServiceName", LogLevel.all, Yes.nullDelimeter); 37 38 error("===== Error Information ====="); 39 */ 40 41 /** 42 Example: 43 --- 44 // HTTP 45 import std.net.curl: HTTP; 46 Compress c; // `null` value is for no compression 47 // c = new Compress; 48 // c = new Compress(HeaderFormat.gzip); 49 sharedLog = new HttpGrayLogger(HTTP("192.168.59.103:12201/gelf"), c, "YourServiceName", LogLevel.all); 50 51 error("===== Error Information ====="); 52 --- 53 */ 54 55 module gelf; 56 57 //version = gelf_test_udp; 58 //version = gelf_test_tcp; 59 //version = gelf_test_http; 60 61 /// UDP 62 version(gelf_test_udp) 63 /// 64 unittest 65 { 66 import std.format: format; 67 foreach(i, c; [Compress.init, new Compress, new Compress(HeaderFormat.gzip)]) 68 { 69 auto socket = new UdpSocket(); 70 socket.connect(new InternetAddress("192.168.59.103", 12201)); 71 auto logger = new UdpGrayLogger(socket, null, "UDP%s".format(i), LogLevel.all, 512); 72 logger.errorf("===== UDP #%s.0 =====", i); 73 logger.errorf("===== UDP #%s.1 =====", i); 74 logger.errorf("========== UDP #%s.3 ==========\n%s", i, 75 "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed vitae nisl scelerisque, 76 vestibulum arcu quis, rhoncus leo. Nunc ullamcorper nibh vitae nisl viverra dignissim. 77 Etiam dictum tincidunt commodo. Morbi faucibus et ipsum in hendrerit. Phasellus rutrum, 78 lacus at auctor tempor, metus nisl suscipit nisi, elementum molestie quam enim nec erat. 79 Sed cursus libero felis, in pulvinar neque molestie eget. Praesent pulvinar est vitae sem 80 pulvinar, pharetra dignissim velit condimentum. 81 82 Vestibulum laoreet lorem eu dui ornare, ac congue enim consectetur. 83 Morbi tincidunt, turpis et egestas sodales, erat velit suscipit felis, 84 quis porttitor nulla turpis ut odio. Fusce in faucibus felis, ac feugiat mauris. 85 Nullam vel sagittis mi. Nullam eu turpis ullamcorper, porta odio sit amet, dictum lorem. 86 Nunc dictum in sem vel pharetra. In consectetur posuere massa, sed convallis felis tempus quis. 87 Maecenas eleifend aliquam lectus pretium aliquam. Morbi viverra dui tortor, 88 vel laoreet libero accumsan sed. Quisque congue erat quis nisl sed."); 89 } 90 } 91 92 /// TCP 93 version(gelf_test_tcp) 94 /// 95 unittest 96 { 97 auto socket = new TcpSocket(); 98 socket.connect(new InternetAddress("192.168.59.103", 12202)); 99 auto logger = new TcpGrayLogger(socket, "TCP", LogLevel.all); 100 logger.error("===== TCP.0 ====="); 101 logger.error("===== TCP.1 ====="); 102 logger.error("===== TCP.2 ====="); 103 } 104 105 /// HTTP 106 version(gelf_test_http) 107 /// 108 unittest 109 { 110 import std.format: format; 111 import std.net.curl; 112 foreach(i, c; [Compress.init, new Compress, new Compress(HeaderFormat.gzip)]) 113 { 114 auto logger = new HttpGrayLogger(HTTP("192.168.59.103:12204/gelf"), c, "HTTP%s".format(i), LogLevel.all); 115 logger.errorf("===== HTTP #%s =====", i); 116 } 117 } 118 119 /// 120 unittest 121 { 122 void t_udp() 123 { 124 Compress c; //`null` value for no compression 125 // c = new Compress; 126 // c = new Compress(HeaderFormat.gzip); 127 auto socket = new UdpSocket(); 128 socket.connect(new InternetAddress("192.168.59.103", 12201)); 129 // The last param is UDP chunk size. This is optional paramter with default value equals to 8192 130 sharedLog = new UdpGrayLogger(socket, c, "YourServiceName", LogLevel.all, 4096); 131 error("===== Error Information ====="); 132 } 133 134 void t_tcp() 135 { 136 import std.typecons: Yes, No; 137 auto socket = new TcpSocket(); 138 socket.connect(new InternetAddress("192.168.59.103", 12201)); 139 /+Defualt value for nullDelimeter is `Yes`. Newline delimiter would be used if nullDelimeter is `false`/`No`.+/ 140 sharedLog = new TcpGrayLogger(socket, "YourServiceName", LogLevel.all, Yes.nullDelimeter); 141 error("===== Error Information ====="); 142 } 143 144 void t_http() 145 { 146 import std.net.curl: HTTP; 147 Compress c; //`null` value for no compression 148 // c = new Compress; 149 // c = new Compress(HeaderFormat.gzip); 150 sharedLog = new HttpGrayLogger(HTTP("192.168.59.103:12204/gelf"), c, "YourServiceName", LogLevel.all); 151 error("===== Error Information ====="); 152 } 153 } 154 155 public import std.experimental.logger.core; 156 157 import std.socket; 158 import std.format : formattedWrite; 159 import std.datetime : Date, DateTime, SysTime, UTC; 160 import std.concurrency : Tid; 161 import std.zlib: Compress, HeaderFormat; 162 import core.stdc.errno: errno, EINTR; 163 164 /** 165 HTTP Graylog Logger 166 */ 167 class HttpGrayLogger : GrayLogger 168 { 169 import std.net.curl; 170 171 protected HTTP _http; 172 173 /++ 174 Params: 175 http = HTTP configuration. See `std.net.curl`. 176 compress = compress algorithm. Sets `null` or `Compress.init` to send messages without compression. 177 host = local service name 178 v = log level 179 chunk = maximal chunk size (size of UDP datagram) 180 +/ 181 this(HTTP http, Compress compress, string host, LogLevel v) @trusted 182 { 183 _http = http; 184 super(compress, host, v); 185 } 186 187 override protected void writeLogMsg(ref LogEntry payload) @trusted 188 { 189 fillAppender(payload); 190 auto msg = _dataAppender.data; 191 scope(exit) clearAppender; 192 http.contentLength = msg.length; 193 http.onSend = 194 (void[] data) 195 { 196 import std.algorithm.comparison: min; 197 immutable len = min(data.length, msg.length); 198 if (len) 199 { 200 data[0..len] = msg[0..len]; 201 msg = msg[len .. $]; 202 } 203 return len; 204 }; 205 import std.typecons: No; 206 http.perform(No.throwOnError); 207 } 208 209 final HTTP http() @property nothrow 210 { 211 return _http; 212 } 213 } 214 215 /** 216 TCP Graylog Logger 217 */ 218 class TcpGrayLogger : SocketGrayLogger 219 { 220 import std.typecons: Flag, Yes; 221 222 private immutable string delim; 223 224 /** 225 Graylog TCP connection does not support compression. 226 Params: 227 socket = remote blocking TCP socket 228 host = local service name 229 v = log level 230 useNull = Use null byte as frame delimiter? Otherwise newline delimiter is used. 231 */ 232 this(TcpSocket socket, string host, LogLevel v, Flag!"nullDelimeter" useNull = Yes.nullDelimeter) @safe 233 { 234 delim = useNull ? "\0" : "\n"; 235 super(socket, null, host, v); 236 } 237 238 /// 239 override protected void writeLogMsg(ref LogEntry payload) @trusted 240 { 241 if(!socket.isAlive) 242 { 243 // The socket is dead. 244 // Do nothing 245 return; 246 } 247 fillAppender(payload); 248 scope(exit) clearAppender; 249 _dataAppender.put(cast(ubyte[])delim); 250 auto data = _dataAppender.data; 251 do 252 { 253 immutable status = socket.send(data); 254 if(status == Socket.ERROR) 255 { 256 if(errno == EINTR) 257 { 258 // Probably the GC interupted the process. 259 // Try again 260 continue; 261 } 262 // Failed to send data 263 // Do nothing 264 break; 265 } 266 data = data[status..$]; 267 } 268 while(data.length); 269 } 270 } 271 272 /** 273 UDP Graylog Logger 274 */ 275 class UdpGrayLogger : SocketGrayLogger 276 { 277 protected ubyte[] _chunk; 278 import std.random; 279 import std.datetime; 280 281 Mt19937 gen; 282 283 /** 284 Params: 285 socket = remote blocking UDP socket 286 compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression. 287 host = local service name 288 v = log level 289 chunk = maximal chunk size (size of UDP datagram) 290 */ 291 this(UdpSocket socket, Compress compress, string host, LogLevel v, int chunk = 8192) @safe 292 { 293 super(socket, compress, host, v); 294 if(chunk < 512) 295 throw new Exception("chunk must be greater or equal to 512"); 296 _chunk = new ubyte[chunk]; 297 _chunk[0] = 0x1e; 298 _chunk[1] = 0x0f; 299 gen = typeof(gen)(cast(uint)(MonoTime.currTime.ticks ^ (uniform!size_t * hashOf(host)))); 300 } 301 302 protected bool send(const(void)[] data) @safe 303 { 304 for(;;) 305 { 306 immutable status = socket.send(data); 307 if(status == Socket.ERROR) 308 { 309 if(errno == EINTR) 310 { 311 // Probably the GC interupted the process. 312 // Try again 313 continue; 314 } 315 // Failed to send the datagram. 316 // Do nothing 317 return false; 318 } 319 return status == data.length; 320 } 321 } 322 323 override protected void writeLogMsg(ref LogEntry payload) @trusted 324 { 325 if(!socket.isAlive) 326 { 327 // The socket is dead. 328 // Do nothing 329 return; 330 } 331 fillAppender(payload); 332 auto data = _dataAppender.data; 333 scope(exit) clearAppender; 334 if(data.length <= _chunk.length) 335 { 336 // send all data as single datagram 337 send(data); 338 } 339 else 340 { 341 // send all data by chunks 342 import std.range: chunks, enumerate; 343 auto chs = (cast(ubyte[])data).chunks(_chunk.length - 12); 344 immutable len = chs.length; 345 if(len > 128) 346 { 347 // ignore huge msg 348 return; 349 } 350 ulong[1] id = void; 351 id[0] = uniform!ulong(gen); 352 _chunk[2..10] = cast(ubyte[]) id; 353 _chunk[11] = cast(ubyte) len; 354 foreach(i, ch; chs.enumerate) 355 { 356 //Endianness does not matter 357 _chunk[10] = cast(ubyte) i; 358 immutable datagramLength = 12 + ch.length; 359 _chunk[12 .. datagramLength] = ch[]; 360 immutable success = send(_chunk[0 .. datagramLength]); 361 if(!success) 362 { 363 return; 364 } 365 } 366 } 367 } 368 } 369 370 /** 371 Abstract Socket Graylog Logger 372 */ 373 abstract class SocketGrayLogger : GrayLogger 374 { 375 protected Socket _socket; 376 377 /** 378 Params: 379 socket = remote blocking socket 380 compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression. 381 host = local service name 382 v = log level 383 */ 384 this(Socket socket, Compress compress, string host, LogLevel v) @safe 385 { 386 if(!socket.blocking) 387 throw new SocketException("SocketGrayLogger: socket must be blocking."); 388 _socket = socket; 389 super(compress, host, v); 390 } 391 392 /// 393 final Socket socket() @property @safe pure nothrow @nogc 394 { 395 return _socket; 396 } 397 } 398 399 /** 400 Abstract Graylog Logger 401 */ 402 abstract class GrayLogger : Logger 403 { 404 enum string gelfVersion = "1.1"; 405 import std.array: appender, Appender; 406 407 protected string _host; 408 protected Compress _compress; 409 protected immutable string _msgStart; 410 protected Appender!(ubyte[]) _dataAppender; 411 412 /** 413 Params: 414 compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression. 415 host = local service name 416 v = log level 417 */ 418 this(Compress compress, string host, LogLevel v) @safe 419 { 420 _dataAppender = appender!(ubyte[]); 421 _host = host; 422 _compress = compress; 423 _msgStart = `{"version":"` ~ gelfVersion ~ `","host":"` ~ host ~ `","short_message":`; 424 super(v); 425 } 426 427 protected void formatMessage(scope void delegate(const(char)[]) sink, ref LogEntry payload) @trusted 428 { 429 import std.format; 430 FormatSpec!char fmt; 431 432 sink(_msgStart); 433 sink.formatElement(payload.msg, fmt); 434 435 sink(`,"timestamp":`); 436 //auto time = payload.timestamp; 437 auto time = payload.timestamp.toUTC; 438 sink.formatValue(time.toUnixTime, fmt); 439 if(immutable msc = time.fracSecs.total!"msecs") 440 { 441 sink("."); 442 sink.formatValue(msc, fmt); 443 } 444 445 sink(`,"level":`); 446 uint level = void; 447 final switch(payload.logLevel) with(LogLevel) 448 { 449 case all: level = 7; break; // Debug: debug-level messages 450 case trace: level = 6; break; // Informational: informational messages 451 case info: level = 5; break; // Notice: normal but significant condition 452 case warning: level = 4; break; // Warning: warning conditions 453 case error: level = 3; break; // Error: error conditions 454 case critical:level = 2; break; // Critical: critical conditions 455 case fatal: level = 1; break; // Alert: action must be taken immediately 456 case off: level = 0; break; // Emergency: system is unusable 457 } 458 sink.formatValue(level, fmt); 459 460 sink(`,"line":`); 461 sink.formatValue(payload.line, fmt); 462 sink(`,"file":"`); 463 sink.formatValue(payload.file, fmt); 464 sink(`","_func_name":"`); 465 sink(payload.funcName); 466 sink(`","_pretty_func_name":"`); 467 sink(payload.prettyFuncName); 468 sink(`","_module_name":"`); 469 sink(payload.moduleName); 470 sink(`"}`); 471 } 472 473 /// 474 final void fillAppender(ref LogEntry payload) @trusted 475 { 476 if(_compress) 477 { 478 formatMessage( (str) { _dataAppender.put(cast(const(ubyte)[]) _compress.compress(str)); }, payload); 479 _dataAppender.put(cast(const(ubyte)[]) _compress.flush); 480 } 481 else 482 { 483 formatMessage( (str) { _dataAppender.put(cast(const(ubyte)[]) str); }, payload); 484 } 485 } 486 487 /// 488 final void clearAppender() 489 { 490 enum ml = 8192; 491 _dataAppender.clear; 492 if(_dataAppender.capacity > ml) 493 { 494 _dataAppender.shrinkTo(ml); 495 } 496 } 497 498 /// 499 final string host() @property @safe pure nothrow @nogc 500 { 501 return _host; 502 } 503 }