How to format the xml logs into json format in logstash

My Log format is like below.. It is c++ app logs where we have request & response xmls within the logs. I am interested to convert those xmls into json format and save it in output file.

12/20 00:00:23.00 dte.cpp(28367)1914 IDL::ProcessRequestXML: DTE_KEY: Queueing request: SID:A232000004-, PERSISTID:46, WPID:0, SYS:NFAM, EMPID:, CMD:CLOSE_SESSION, PRIORITY:Default, TIMEOUT:300 sec
      12/20 00:00:23.05 dte.cpp(28367)563  PendingResponseCount: Set(46) -> 1
      12/20 00:00:23.05 priorit(28367)422  PutOnQueue: 46, "", defWait = 0, minWait = 0, timeoutInterval = 300
      12/20 00:00:23.05 priorit(28367)457  PutOnQueue: Calling DequeueWakeUp() ...
      12/20 00:00:23.05 priorit(28367)916  DequeueWakeUp: NULL
      12/20 00:00:23.05 priorit(28376)703  Dequeue: return(46)
      12/20 00:00:23.05 dte.cpp(28376)2386 ProcessRequest: DTE_KEY: Received XML request: SID:A232000004-, PERSISTID: 46, WPID:72000001, PRIORITY:Default, TIMEOUT:300 sec, returnIOR: IOR:010000001300000049444c3a4f7373526573756c74733a312e30000001000000000000007000000001010120100000003131332e3134302e3230372e31323900b78a55451b00000014010f0052535459b6fb5d642d010002000000010000000300000053020000000000000008000000010000d0004f41540100000018000000010200d00100010001000000010001050901010000000000
<DELPHI_REQUEST>
 <CTLHDR>
  <DIPVER>5.0</DIPVER>
  <DOMAIN>FTTP</DOMAIN>
  <SYS_NAME>NFAM</SYS_NAME>
  <DTIMEOUT>5</DTIMEOUT>
  <ACKMNTS>N</ACKMNTS>
  <MGRID>NFAM</MGRID>
 </CTLHDR>
 <REQUEST>
  <SID>A232000004-</SID>
  <REQUIRED>
   <COMMAND>CLOSE_SESSION</COMMAND>
  </REQUIRED>
 </REQUEST>
</DELPHI_REQUEST>
12/20 00:02:07.47 ft_acce(28395)786  LogThread:        TRDB_FT queue =    0 (   0) [    17,      0] XML
      12/20 00:02:07.47 ft_acce(28395)786  LogThread:       HNMNG_FT queue =    0 (   0) [    23,      0] XML
      12/20 00:02:42.00 dte.cpp(28367)1914 IDL::ProcessRequestXML: DTE_KEY: Queueing request: SID:A232000005-, PERSISTID:47, WPID:0, SYS:NFAM, EMPID:, CMD:CLOSE_SESSION, PRIORITY:Default, TIMEOUT:300 sec
      12/20 00:02:42.00 dte.cpp(28367)563  PendingResponseCount: Set(47) -> 1
      12/20 00:02:42.00 priorit(28367)422  PutOnQueue: 47, "", defWait = 0, minWait = 0, timeoutInterval = 300
      12/20 00:02:42.00 priorit(28367)457  PutOnQueue: Calling DequeueWakeUp() ...
      12/20 00:02:42.00 priorit(28367)916  DequeueWakeUp: NULL
      12/20 00:02:42.00 priorit(28363)703  Dequeue: return(47)
      12/20 00:02:42.00 dte.cpp(28363)2386 ProcessRequest: DTE_KEY: Received XML request: SID:A232000005-, PERSISTID: 47, WPID:72000002, PRIORITY:Default, TIMEOUT:300 sec, returnIOR: IOR:010000001300000049444c3a4f7373526573756c74733a312e30000001000000000000007000000001010145100000003131332e3134302e3230372e31323900b78a203c1b00000014010f0052535459b6fb5d642d010002000000010000000300000056020000000000000008000000010800d0004f41540100000018000000010200d00100010001000000010001050901010000000000
<DELPHI_REQUEST>
 <CTLHDR>
  <DIPVER>5.0</DIPVER>
  <DOMAIN>FTTP</DOMAIN>
  <SYS_NAME>NFAM</SYS_NAME>
  <DTIMEOUT>5</DTIMEOUT>
  <ACKMNTS>N</ACKMNTS>
  <MGRID>NFAM</MGRID>
 </CTLHDR>
 <REQUEST>
  <SID>A232000005-</SID>
  <REQUIRED>
   <COMMAND>CLOSE_SESSION</COMMAND>
  </REQUIRED>
 </REQUEST>
</DELPHI_REQUEST>

I am using below code in my conf file.. But the output it is giving is not having <DELPHI_REQUEST> as start tag instead it has other tags like below. Can anyone help me with the changes i should add in the code.

input {
file {
        path => "/opt/delphi/Bakkiyaraj/FIOS_1.log"
        start_position => "beginning"
        codec => multiline { pattern => '^\s*</DELPHI.*>' negate => true what => "next" auto_flush_interval => 1 }
        sincedb_path => "/dev/null"
}
}
filter
        { xml { source => "message" target => "theXML" }

mutate {
    remove_field => ["message"]
  }
}

output {
      file {
         path => "/opt/delphi/Bakkiyaraj/logstash-7.5.1/bin/logs/output2.log"
         codec => rubydebug { metadata => false }
      }
   }

O/p I got for first XML in the log:

{
          "tags" => [
        [0] "multiline"
    ],
          "path" => "/opt/delphi/Bakkiyaraj/FIOS_1.log",
      "@version" => "1",
    "@timestamp" => 2020-03-24T10:45:12.878Z,
          "host" => "delfdta10aws",
        "theXML" => {
         "CTLHDR" => [
            [0] {
                   "MGRID" => [
                    [0] "NFAM"
                ],
                  "DIPVER" => [
                    [0] "5.0"
                ],
                  "DOMAIN" => [
                    [0] "FTTP"
                ],
                 "ACKMNTS" => [
                    [0] "N"
                ],
                "SYS_NAME" => [
                    [0] "NFAM"
                ],
                "DTIMEOUT" => [
                    [0] "5"
                ]
            }
        ],
        "REQUEST" => [
            [0] {
                     "SID" => [
                    [0] "A232000004-"
                ],
                "REQUIRED" => [
                    [0] {
                        "COMMAND" => [
                            [0] "CLOSE_SESSION"
                        ]
                    }
                ]
            }
        ]
    }
}

That is expected. The top level element is not retained.

You may find setting the force_array option to false is useful, also, I would add 'remove_field => [ "message" ]' to the xml filter itself rather than putting it in a separate mutate filter. That way, if the xml filter fails to parse the message it will not be removed.

Hi Badger,
Thanks for your reply.
I have added force_array = false in the filter as below

filter
        { xml { source => "message" force_array => false target => "theXML" }

mutate {
    remove_field => ["message","tags","@version"]
  }
}

but still the output doesn't have my top tag i.e. <DELPHI_REQUEST>. That is important for me because without that I cannot differentiate whether it is a request XML or response XML as the tags within them will have same names.

Can you please help me here.

An xml filter will not retain the top level element of the XML, so you will have to check the message contents using something like grok.

HI Badger,
I am very new to logstash and trying to write grok for my scenario but it is not giving me the required results. Can u pls help to share the grok code I need to get the required output.

Below is my sample log and what I need is only the xmls from the log.

12/20 00:00:23.00 dte.cpp(28367)1914 IDL::ProcessRequestXML: DTE_KEY: Queueing request: SID:A232000004-, PERSISTID:46, WPID:0, SYS:NFAM, EMPID:, CMD:CLOSE_SESSION, PRIORITY:Default, TIMEOUT:300 sec
      12/20 00:00:23.05 dte.cpp(28367)563  PendingResponseCount: Set(46) -> 1
      12/20 00:00:23.05 priorit(28367)422  PutOnQueue: 46, "", defWait = 0, minWait = 0, timeoutInterval = 300
      12/20 00:00:23.05 priorit(28367)457  PutOnQueue: Calling DequeueWakeUp() ...
      12/20 00:00:23.05 priorit(28367)916  DequeueWakeUp: NULL
      12/20 00:00:23.05 priorit(28376)703  Dequeue: return(46)
      12/20 00:00:23.05 dte.cpp(28376)2386 ProcessRequest: DTE_KEY: Received XML request: SID:A232000004-, PERSISTID: 46, WPID:72000001, PRIORITY:Default, TIMEOUT:300 sec, returnIOR: IOR:010000001300000049444c3a4f7373526573756c74733a312e30000001000000000000007000000001010120100000003131332e3134302e3230372e31323900b78a55451b00000014010f0052535459b6fb5d642d010002000000010000000300000053020000000000000008000000010000d0004f41540100000018000000010200d00100010001000000010001050901010000000000
<DELPHI_REQUEST>
 <CTLHDR>
  <DIPVER>5.0</DIPVER>
  <DOMAIN>FTTP</DOMAIN>
  <SYS_NAME>NFAM</SYS_NAME>
  <DTIMEOUT>5</DTIMEOUT>
  <ACKMNTS>N</ACKMNTS>
  <MGRID>NFAM</MGRID>
 </CTLHDR>
 <REQUEST>
  <SID>A232000004-</SID>
  <REQUIRED>
   <COMMAND>CLOSE_SESSION</COMMAND>
  </REQUIRED>
 </REQUEST>
</DELPHI_REQUEST>
12/20 00:02:07.47 ft_acce(28395)786  LogThread:        TRDB_FT queue =    0 (   0) [    17,      0] XML
      12/20 00:02:07.47 ft_acce(28395)786  LogThread:       HNMNG_FT queue =    0 (   0) [    23,      0] XML
      12/20 00:02:42.00 dte.cpp(28367)1914 IDL::ProcessRequestXML: DTE_KEY: Queueing request: SID:A232000005-, PERSISTID:47, WPID:0, SYS:NFAM, EMPID:, CMD:CLOSE_SESSION, PRIORITY:Default, TIMEOUT:300 sec
      12/20 00:02:42.00 dte.cpp(28367)563  PendingResponseCount: Set(47) -> 1
      12/20 00:02:42.00 priorit(28367)422  PutOnQueue: 47, "", defWait = 0, minWait = 0, timeoutInterval = 300
      12/20 00:02:42.00 priorit(28367)457  PutOnQueue: Calling DequeueWakeUp() ...
      12/20 00:02:42.00 priorit(28367)916  DequeueWakeUp: NULL
      12/20 00:02:42.00 priorit(28363)703  Dequeue: return(47)
      12/20 00:02:42.00 dte.cpp(28363)2386 ProcessRequest: DTE_KEY: Received XML request: SID:A232000005-, PERSISTID: 47, WPID:72000002, PRIORITY:Default, TIMEOUT:300 sec, returnIOR: IOR:010000001300000049444c3a4f7373526573756c74733a312e30000001000000000000007000000001010145100000003131332e3134302e3230372e31323900b78a203c1b00000014010f0052535459b6fb5d642d010002000000010000000300000056020000000000000008000000010800d0004f41540100000018000000010200d00100010001000000010001050901010000000000
<DELPHI_REQUEST>
 <CTLHDR>
  <DIPVER>5.0</DIPVER>
  <DOMAIN>FTTP</DOMAIN>
  <SYS_NAME>NFAM</SYS_NAME>
  <DTIMEOUT>5</DTIMEOUT>
  <ACKMNTS>N</ACKMNTS>
  <MGRID>NFAM</MGRID>
 </CTLHDR>
 <REQUEST>
  <SID>A232000005-</SID>
  <REQUIRED>
   <COMMAND>CLOSE_SESSION</COMMAND>
  </REQUIRED>
 </REQUEST>
</DELPHI_REQUEST>

basically I need

<DELPHI_REQUEST>
content within these tags
</DELPHI_REQUEST>
<DELPHI_RESPONSE>
content within these tags
<DELPHI_RESPONSE> 

Later I want to convert it to json format output

Use an xml filter to parse the bulk of it and then something like

grok { match => { "message" => "<DELPHI_%{WORD:messageType}>" } }

to determine whether it is a request or response.

Hi Badger,

I tried below config file as suggested but the o/p doesn't show this new field.


input {
file {
        path => "/opt/delphi/Bakkiyaraj/FIOS_1.log"
        #path => "/opt/delphi/logs/saved/dte_FIOS.03-24-2020.log-1"
        start_position => "beginning"
        #codec => multiline { pattern => '</DELPHI.*>' negate => true what => "" auto_flush_interval => 1 }
        sincedb_path => "/dev/null"
        codec => multiline {
                pattern => '^\s*</DELPHI.*>'
                negate => true
                what => "next"
                auto_flush_interval => 1
        }
}
}
filter
{
xml { source => "message" target => "theXML" }

grok { match => { "message" => "DELPHI_%{WORD:messageType}" }
       add_field => { "Mtype" => "%{messageType}" }
 }

#xml { source => "message" target => "theXML" }
mutate {
    remove_field => ["tags","@version","host","@timestamp","path"]
  }
}

output {
      file {
         path => "/opt/delphi/Bakkiyaraj/logstash-7.5.1/bin/logs/test.log"
        # codec => json
        codec => rubydebug { metadata => false }
      }
   }

Output:


     "theXML" => {
        "REQUEST" => [
            [0] {
                "REQUIRED" => [
                    [0] {
                        "COMMAND" => [
                            [0] "CLOSE_SESSION"
                        ]
                    }
                ],
                     "SID" => [
                    [0] "A232000005-"
                ]
            }
        ],
         "CTLHDR" => [
            [0] {
                  "DIPVER" => [
                    [0] "5.0"
                ],
                "SYS_NAME" => [
                    [0] "NFAM"
                ],
                 "ACKMNTS" => [
                    [0] "N"
                ],
                  "DOMAIN" => [
                    [0] "FTTP"
                ],
                   "MGRID" => [
                    [0] "NFAM"
                ],
                "DTIMEOUT" => [
                    [0] "5"
                ]
            }
        ]
    }
}