Simovits

Lumberjack protocol v2 på 100 rader C#

I Elasticsearch-stacken finns Logstash med som loggmottagare. Logstash är ofta inte önskvärt som loggmottagare eftersom det utnyttjar en stor mängd systemresurser vilket förstås inte alltid är lämpligt. Därför används ofta andra loggmottagare eller egenutvecklade loggmottagare.

Lumberjack är det protokoll som vanligen används av Elasticsearch beats-suiten (https://www.elastic.co/products/beats) såsom Filebeat och Winlogbeat då loggar ska skeppas. Protokollet har i sig relativt små overhead-förluster och man har så försökt skapa protokollet för att passa till överföring av loggar. Det är inte så väldokumenterat som man skulle önska men å andra sidan är det open source så det går fint att läsa källkoden (som är skriven i Golang). Den dokumentation jag hittade efter lite googlingar är följande: https://github.com/elastic/logstash-forwarder/blob/master/PROTOCOL.md .  När man ska skeppa stora mängder loggar är det viktigt att tänka på att protokollet som används inte tar för mycket prestanda ur nät och systemresurser för loggmottagare. Av den anledningen gjorde jag en liten exempel-implementation av Lumberjack v2 i c# så jag kan vara säker på exakt hur det funkar.

Lumberjack v2 är, som sagt ett okomplicerat protokoll att implementera och fungerar vid normal operation enligt följande:

  1. Beats skickar window-size vilket i praktiken är det antal frames som loggmottagaren ska ta emot innan ett ACK returneras.
  2. Beats skickar det antal frames som tidigare specificerats i steg 1.
  3. Loggmottagaren tar emot alla frames fram till den sista i fönstret och skickar ett ACK till Beats.
  4. Beats tar emot ACK och noterar att loggmottagaren tagit emot alla frames i fönstret och börjar därefter om på steg 1 (och skickar ny window-size).

Det finns 3 typer av frames för loggrader: key-value och json. Frames kan även komprimeras med zlib för att spara utrymme i nätverkstrafik.

Nedan visas en enkel implementation för okomprimerade frames med json-struktur på cirka 100 kodrader (utan felhantering etc. för att enklare kunna fokusera på själva protokollet). Ett exekverbart exempel med tillhörande boilerplate-kod kan hittas under: https://github.com/jamesrep/lumbercsharp .

 

    public class LumberjackTemplate
    {
        public delegate void CALLBACK_onDataFrame(LumberFrame frame);
        CALLBACK_onDataFrame onDataFrame = null;
        bool bContinue = true;
        System.Text.Encoding encoder = new  System.Text.ASCIIEncoding();
        uint windowSize = 0;
        byte[] ackBytes = new byte[6];
        uint lastFrameToAck = 0;
        uint frameReceivedCount = 0;


        public class LumberFrame
        {
            public int version;
            public int frameType;
            public uint sequenceNumber;
            public int pairCount;
            public uint keyLength;
            public uint valueLength;
            public string strJson;
        }

        public void setDataCallback(CALLBACK_onDataFrame cb) { this.onDataFrame = cb; }
        
        // Send ack for the received sequence-number (bulk-ack)
        void sendAck(Stream sr, uint sequenceNumber)
        {
            byte[] bts = BitConverter.GetBytes(sequenceNumber);
            ackBytes[0] = 0x32;
            ackBytes[1] = 0x41;
            Array.Copy(bts, 0, ackBytes, 2, 4);
            Array.Reverse(ackBytes, 2, 4);

            sr.Write(ackBytes, 0, ackBytes.Length);
            sr.Flush();
        }

        // Parse
        public void parseStream(Stream sr)
        {
            while (bContinue)
            {
                LumberFrame frame = new LumberFrame();
                frame.version = sr.ReadByte();

                if (frame.version != 0x32) throw new NotImplementedException("Frame version not supported");                                

                frame.frameType = sr.ReadByte(); // Determine type of frame

                if (frame.frameType == 0x57)
                {

                    byte[] btsWindowSize = new byte[4];
                    int readBytes = sr.Read(btsWindowSize, 0, btsWindowSize.Length);

                    if (readBytes != btsWindowSize.Length) //error
                    {
                        sendAck(sr, lastFrameToAck); // We ack the last frame we received then crash the connection
                        return;
                    }

                    Array.Reverse(btsWindowSize);
                    windowSize = BitConverter.ToUInt32(btsWindowSize, 0); // the number of frames before ack.
                    frameReceivedCount = 0; // reset the frame-received-counter

                    continue;
                }
                else if (frame.frameType == 0x4a)
                {
                    byte[] btsHeader = new byte[8];
                    int retval = sr.Read(btsHeader, 0, btsHeader.Length);

                    if (retval != btsHeader.Length) return;

                    Array.Reverse(btsHeader, 0, 4);
                    Array.Reverse(btsHeader, 4, 4);

                    frame.sequenceNumber = BitConverter.ToUInt32(btsHeader, 0);
                    uint payloadLength = (uint)BitConverter.ToInt32(btsHeader, 4);
                    byte[] btsAll = new byte[payloadLength];
                    retval = sr.Read(btsAll, 0, (int)payloadLength);
                    frame.strJson = encoder.GetString(btsAll);

                    if (onDataFrame != null) onDataFrame(frame);
                    
                    lastFrameToAck = frame.sequenceNumber;
                    frameReceivedCount++;

                    if (frameReceivedCount >= windowSize)
                    {  
                        frameReceivedCount = 0;
                        sendAck(sr, lastFrameToAck);
                    }
                }
                else
                {
                    throw new NotImplementedException("Frame type not supported");
                }
            }
        }
}