Convert embede xml in log to json in logstash

logs sent from filebeat to logstash and I write a pattern to parse them.developers added logs that have embeded xml inside logs and here is a sample :

2024-07-08 15:18:35,608 INFO |body=<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ss2:normal xmlns:ns2="http://service..com/">
    <context>
        <data>
            <key>ADDRESS</key>
            <value>0.0.0.0</value>
        </data>
        <data>
            <key>ssid</key>
            <value>d1d71e02-25ff-1</value>
        </data>
        <data>
            <key>m-id</key>
            <value>25440fb</value>
        </data>
    </context>
    <request>
        <amount>50</amount>
    </request>
</ss2:normal>

how can I parse body section and retrieve values in json format considering that xml in body is dynamiclaly changed by type of services?

You could try something like

    if "|body=<?xml" in [message] {
        mutate { copy => { "message" => "[@metadata][message]" } }
        mutate { gsub => [ "[@metadata][message]", "^.*\|body=", "" ] }
        # Remove namespaces like ss2:
        mutate { gsub => [ "[@metadata][message]", "<(/?)\w+:", "<\1" ] }
        xml {
            source => "[@metadata][message]"
            target => "theXML"
            store_xml => true
            force_array => false
        }
        ruby {
            code => '
                data = event.remove("[theXML][context][data]")
                if data.is_a? Array
                    data.each { |x|
                        key = x["key"]
                        event.set("[theXML][context][#{key}]", x["value"])
                    }
                end
            '
        }
    }

Note that using ruby to convert the [context][data] array to hash values may result in an explosion of field names that causes downstream problems.

1 Like

I need log level and timestamp too , actually treat whole as a sinle log
do I need to change or add any configuration in filebeat? for example multiline codec if so , how can I config filebeat for thi specific logs?
currently my filebeat config is :

  • type: log
    fields:
    source: 'filebeat2'
    logID: logbackup
    fields_under_root: true
    enabled: true
    paths:
    • /home/logbackup/a.log
    • /home/logbackup/backup/a.log-*
      #- c:\programdata\elasticsearch\logs*
      #ignore_older: 24h
      close_inactive : 30m

Yes, you do. I was using

file { 
    path => "/home/user/test.txt" 
    sincedb_path => "/dev/null" 
    start_position => beginning 
    codec => multiline { 
        pattern => "^\d{4}-\d{2}-\d{2} " 
        negate => true 
        what => previous 
        auto_flush_interval => 2 
        multiline_tag => "" 
    }
}

but if you are using filebeat you must do the multiline processing there. I don't run filebeat so I cannot help you with the syntax.

may I use grok to parse first section of log till xml part starts for example :

(?%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}) %{LOGLEVEL:logLevel}.?hub_(?[a-zA-Z0-9._-]+).?%{UUID:correlationId}|body=%{GREEDYDATA:body}

Your example message does not have the hub_ or UUID, but you could try something like

 grok { match => { "message" => "(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}) %{LOGLEVEL:logLevel}.?(hub_(?<someField>[a-zA-Z0-9._-]+).?%{UUID:correlationId})?\|body=%{GREEDYDATA:body}" } }
1 Like