1 /++ 2 Implementation of Graylog Extended Logging Format for `std.experimental.logger`. 3 +/ 4 module gelf; 5 6 //version = gelf_test_udp; 7 //version = gelf_test_tcp; 8 //version = gelf_test_http; 9 10 /// UDP 11 version(gelf_test_udp) 12 unittest 13 { 14 import std.format: format; 15 foreach(i, c; [Compress.init, new Compress, new Compress(HeaderFormat.gzip)]) 16 { 17 auto socket = new UdpSocket(); 18 socket.connect(new InternetAddress("192.168.59.103", 12201)); 19 auto logger = new UdpGrayLogger(socket, null, "UDP%s".format(i), LogLevel.all, 512); 20 logger.errorf("===== UDP #%s.0 =====", i); 21 logger.errorf("===== UDP #%s.1 =====", i); 22 logger.errorf("========== UDP #%s.3 ==========\n%s", i, 23 "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed vitae nisl scelerisque, 24 vestibulum arcu quis, rhoncus leo. Nunc ullamcorper nibh vitae nisl viverra dignissim. 25 Etiam dictum tincidunt commodo. Morbi faucibus et ipsum in hendrerit. Phasellus rutrum, 26 lacus at auctor tempor, metus nisl suscipit nisi, elementum molestie quam enim nec erat. 27 Sed cursus libero felis, in pulvinar neque molestie eget. Praesent pulvinar est vitae sem 28 pulvinar, pharetra dignissim velit condimentum. 29 30 Vestibulum laoreet lorem eu dui ornare, ac congue enim consectetur. 31 Morbi tincidunt, turpis et egestas sodales, erat velit suscipit felis, 32 quis porttitor nulla turpis ut odio. Fusce in faucibus felis, ac feugiat mauris. 33 Nullam vel sagittis mi. Nullam eu turpis ullamcorper, porta odio sit amet, dictum lorem. 34 Nunc dictum in sem vel pharetra. In consectetur posuere massa, sed convallis felis tempus quis. 35 Maecenas eleifend aliquam lectus pretium aliquam. Morbi viverra dui tortor, 36 vel laoreet libero accumsan sed. Quisque congue erat quis nisl sed."); 37 } 38 } 39 40 /// TCP 41 version(gelf_test_tcp) 42 unittest 43 { 44 auto socket = new TcpSocket(); 45 socket.connect(new InternetAddress("192.168.59.103", 12202)); 46 auto logger = new TcpGrayLogger(socket, "TCP", LogLevel.all); 47 logger.error("===== TCP.0 ====="); 48 logger.error("===== TCP.1 ====="); 49 logger.error("===== TCP.2 ====="); 50 } 51 52 /// HTTP 53 version(gelf_test_http) 54 unittest 55 { 56 import std.format: format; 57 import std.net.curl; 58 foreach(i, c; [Compress.init, new Compress, new Compress(HeaderFormat.gzip)]) 59 { 60 auto logger = new HttpGrayLogger(HTTP("192.168.59.103:12204/gelf"), c, "HTTP%s".format(i), LogLevel.all); 61 logger.errorf("===== HTTP #%s =====", i); 62 } 63 } 64 65 unittest 66 { 67 void t_udp() 68 { 69 Compress c; //`null` value for no compression 70 // c = new Compress; 71 // c = new Compress(HeaderFormat.gzip); 72 auto socket = new UdpSocket(); 73 socket.connect(new InternetAddress("192.168.59.103", 12201)); 74 // The last param is UDP chunk size. This is optional paramter with default value equals to 8192 75 sharedLog = new UdpGrayLogger(socket, c, "YourServiceName", LogLevel.all, 4096); 76 error("===== Error Information ====="); 77 } 78 79 void t_tcp() 80 { 81 import std.typecons: Yes, No; 82 auto socket = new TcpSocket(); 83 socket.connect(new InternetAddress("192.168.59.103", 12201)); 84 /+Defualt value for nullDelimeter is `Yes`. Newline delimiter would be used if nullDelimeter is `false`/`No`.+/ 85 sharedLog = new TcpGrayLogger(socket, "YourServiceName", LogLevel.all, Yes.nullDelimeter); 86 error("===== Error Information ====="); 87 } 88 89 void t_http() 90 { 91 import std.net.curl: HTTP; 92 Compress c; //`null` value for no compression 93 // c = new Compress; 94 // c = new Compress(HeaderFormat.gzip); 95 sharedLog = new HttpGrayLogger(HTTP("192.168.59.103:12204/gelf"), c, "YourServiceName", LogLevel.all); 96 error("===== Error Information ====="); 97 } 98 } 99 100 public import std.experimental.logger.core; 101 102 import std.socket; 103 import std.format : formattedWrite; 104 import std.datetime : Date, DateTime, SysTime, UTC; 105 import std.concurrency : Tid; 106 import std.zlib: Compress, HeaderFormat; 107 import core.stdc.errno: errno, EINTR; 108 109 /++ 110 HTTP Graylog Logger 111 +/ 112 class HttpGrayLogger : GrayLogger 113 { 114 import std.net.curl; 115 116 protected HTTP _http; 117 118 /++ 119 Params: 120 http = HTTP configuration. See `std.net.curl`. 121 compress = compress algorithm. Sets `null` or `Compress.init` to send messages without compression. 122 host = local service name 123 v = log level 124 chunk = maximal chunk size (size of UDP datagram) 125 +/ 126 this(HTTP http, Compress compress, string host, LogLevel v) @trusted 127 { 128 _http = http; 129 super(compress, host, v); 130 } 131 132 override protected void writeLogMsg(ref LogEntry payload) @trusted 133 { 134 fillAppender(payload); 135 auto msg = _dataAppender.data; 136 scope(exit) clearAppender; 137 http.contentLength = msg.length; 138 http.onSend = 139 (void[] data) 140 { 141 import std.algorithm.comparison: min; 142 immutable len = min(data.length, msg.length); 143 if (len) 144 { 145 data[0..len] = msg[0..len]; 146 msg = msg[len .. $]; 147 } 148 return len; 149 }; 150 import std.typecons: No; 151 http.perform(No.throwOnError); 152 } 153 154 final HTTP http() @property nothrow 155 { 156 return _http; 157 } 158 } 159 160 /++ 161 TCP Graylog Logger 162 +/ 163 class TcpGrayLogger : SocketGrayLogger 164 { 165 import std.typecons: Flag, Yes; 166 167 private immutable string delim; 168 169 /++ 170 Graylog TCP connection does not support compression. 171 Params: 172 socket = remote blocking TCP socket 173 host = local service name 174 v = log level 175 useNull = Use null byte as frame delimiter? Otherwise newline delimiter is used. 176 +/ 177 this(TcpSocket socket, string host, LogLevel v, Flag!"nullDelimeter" useNull = Yes.nullDelimeter) @safe 178 { 179 delim = useNull ? "\0" : "\n"; 180 super(socket, null, host, v); 181 } 182 183 override protected void writeLogMsg(ref LogEntry payload) @trusted 184 { 185 if(!socket.isAlive) 186 { 187 // The socket is dead. 188 // Do nothing 189 return; 190 } 191 fillAppender(payload); 192 scope(exit) clearAppender; 193 _dataAppender.put(cast(ubyte[])delim); 194 auto data = _dataAppender.data; 195 do 196 { 197 immutable status = socket.send(data); 198 if(status == Socket.ERROR) 199 { 200 if(errno == EINTR) 201 { 202 // Probably the GC interupted the process. 203 // Try again 204 continue; 205 } 206 // Failed to send data 207 // Do nothing 208 break; 209 } 210 data = data[status..$]; 211 } 212 while(data.length); 213 } 214 } 215 216 /++ 217 UDP Graylog Logger 218 +/ 219 class UdpGrayLogger : SocketGrayLogger 220 { 221 protected ubyte[] _chunk; 222 import std.random; 223 import std.datetime; 224 225 Mt19937 gen; 226 227 /++ 228 Params: 229 socket = remote blocking UDP socket 230 compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression. 231 host = local service name 232 v = log level 233 chunk = maximal chunk size (size of UDP datagram) 234 +/ 235 this(UdpSocket socket, Compress compress, string host, LogLevel v, int chunk = 8192) @safe 236 { 237 super(socket, compress, host, v); 238 if(chunk < 512) 239 throw new Exception("chunk must be greater or equal to 512"); 240 _chunk = new ubyte[chunk]; 241 _chunk[0] = 0x1e; 242 _chunk[1] = 0x0f; 243 gen = typeof(gen)(cast(uint)(MonoTime.currTime.ticks ^ (uniform!size_t * hashOf(host)))); 244 } 245 246 protected bool send(const(void)[] data) @safe 247 { 248 for(;;) 249 { 250 immutable status = socket.send(data); 251 if(status == Socket.ERROR) 252 { 253 if(errno == EINTR) 254 { 255 // Probably the GC interupted the process. 256 // Try again 257 continue; 258 } 259 // Failed to send the datagram. 260 // Do nothing 261 return false; 262 } 263 return status == data.length; 264 } 265 } 266 267 override protected void writeLogMsg(ref LogEntry payload) @trusted 268 { 269 if(!socket.isAlive) 270 { 271 // The socket is dead. 272 // Do nothing 273 return; 274 } 275 fillAppender(payload); 276 auto data = _dataAppender.data; 277 scope(exit) clearAppender; 278 if(data.length <= _chunk.length) 279 { 280 // send all data as single datagram 281 send(data); 282 } 283 else 284 { 285 // send all data by chunks 286 import std.range: chunks, enumerate; 287 auto chs = (cast(ubyte[])data).chunks(_chunk.length - 12); 288 immutable len = chs.length; 289 if(len > 128) 290 { 291 // ignore huge msg 292 return; 293 } 294 ulong[1] id = void; 295 id[0] = uniform!ulong(gen); 296 _chunk[2..10] = cast(ubyte[]) id; 297 _chunk[11] = cast(ubyte) len; 298 foreach(i, ch; chs.enumerate) 299 { 300 //Endianness does not matter 301 _chunk[10] = cast(ubyte) i; 302 immutable datagramLength = 12 + ch.length; 303 _chunk[12 .. datagramLength] = ch[]; 304 immutable success = send(_chunk[0 .. datagramLength]); 305 if(!success) 306 { 307 return; 308 } 309 } 310 } 311 } 312 } 313 314 /++ 315 Abstract Socket Graylog Logger 316 +/ 317 abstract class SocketGrayLogger : GrayLogger 318 { 319 protected Socket _socket; 320 321 /++ 322 Params: 323 socket = remote blocking socket 324 compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression. 325 host = local service name 326 v = log level 327 +/ 328 this(Socket socket, Compress compress, string host, LogLevel v) @safe 329 { 330 if(!socket.blocking) 331 throw new SocketException("SocketGrayLogger: socket must be blocking."); 332 _socket = socket; 333 super(compress, host, v); 334 } 335 336 final Socket socket() @property @safe pure nothrow @nogc 337 { 338 return _socket; 339 } 340 } 341 342 /++ 343 Abstract Graylog Logger 344 +/ 345 abstract class GrayLogger : Logger 346 { 347 enum string gelfVersion = "1.1"; 348 import std.array: appender, Appender; 349 350 protected string _host; 351 protected Compress _compress; 352 protected immutable string _msgStart; 353 protected Appender!(ubyte[]) _dataAppender; 354 355 /++ 356 Params: 357 compress = compress algorithm. Set `null` or `Compress.init` to send messages without compression. 358 host = local service name 359 v = log level 360 +/ 361 this(Compress compress, string host, LogLevel v) @safe 362 { 363 _dataAppender = appender!(ubyte[]); 364 _host = host; 365 _compress = compress; 366 _msgStart = `{"version":"` ~ gelfVersion ~ `","host":"` ~ host ~ `","short_message":`; 367 super(v); 368 } 369 370 protected void formatMessage(scope void delegate(const(char)[]) sink, ref LogEntry payload) @trusted 371 { 372 import std.format; 373 FormatSpec!char fmt; 374 375 sink(_msgStart); 376 sink.formatElement(payload.msg, fmt); 377 378 sink(`,"timestamp":`); 379 //auto time = payload.timestamp; 380 auto time = payload.timestamp.toUTC; 381 sink.formatValue(time.toUnixTime, fmt); 382 if(immutable msc = time.fracSecs.total!"msecs") 383 { 384 sink("."); 385 sink.formatValue(msc, fmt); 386 } 387 388 sink(`,"level":`); 389 uint level = void; 390 final switch(payload.logLevel) with(LogLevel) 391 { 392 case all: level = 7; break; // Debug: debug-level messages 393 case trace: level = 6; break; // Informational: informational messages 394 case info: level = 5; break; // Notice: normal but significant condition 395 case warning: level = 4; break; // Warning: warning conditions 396 case error: level = 3; break; // Error: error conditions 397 case critical:level = 2; break; // Critical: critical conditions 398 case fatal: level = 1; break; // Alert: action must be taken immediately 399 case off: level = 0; break; // Emergency: system is unusable 400 } 401 sink.formatValue(level, fmt); 402 403 sink(`,"line":`); 404 sink.formatValue(payload.line, fmt); 405 sink(`,"file":"`); 406 sink.formatValue(payload.file, fmt); 407 sink(`","_func_name":"`); 408 sink(payload.funcName); 409 sink(`","_pretty_func_name":"`); 410 sink(payload.prettyFuncName); 411 sink(`","_module_name":"`); 412 sink(payload.moduleName); 413 sink(`"}`); 414 } 415 416 final void fillAppender(ref LogEntry payload) @trusted 417 { 418 if(_compress) 419 { 420 formatMessage( (str) { _dataAppender.put(cast(const(ubyte)[]) _compress.compress(str)); }, payload); 421 _dataAppender.put(cast(const(ubyte)[]) _compress.flush); 422 } 423 else 424 { 425 formatMessage( (str) { _dataAppender.put(cast(const(ubyte)[]) str); }, payload); 426 } 427 } 428 429 final void clearAppender() 430 { 431 enum ml = 8192; 432 _dataAppender.clear; 433 if(_dataAppender.capacity > ml) 434 { 435 _dataAppender.shrinkTo(ml); 436 } 437 } 438 439 final string host() @property @safe pure nothrow @nogc 440 { 441 return _host; 442 } 443 }