Parsing Json fields Cisco ESA Iron Port

I use this Logstash filter below to parse Cisco ESA logs

filter {
    if "cef" in [tags] {
    mutate {
        # CEF:0 is pipe delimited, split into individual fields 
        split => ["cef_message", "|"]
        add_field => { "cef_version" => "%{cef_message[0]}" }
        add_field => { "cef_device_vendor" => "%{cef_message[1]}" }
        add_field => { "cef_device_product" => "%{cef_message[2]}" }
        add_field => { "cef_device_version" => "%{cef_message[3]}" }
        add_field => { "cef_sig_severity" => "%{cef_message[6]}" }
        add_field => { "cef_kv_message" => "%{cef_message[7]}" }
    }

    mutate {
        gsub => ["cef_kv_message", "(\S+=)", ", \1"]
    }
    mutate {
          gsub => [ "cef_kv_message", "'",'"' ]
     }
    kv {
        source => "cef_kv_message"
        trim_value => " "
        trim_key => " "
        value_split => "="
        field_split => ","
        remove_field => ["cef_kv_message", "message", "cef_message"]
    }
  }
}

In some logs there are fields in json format, like SPFVerdict or AttachmentDetails, for the AttachementDetails, I want extract for each, the file name , the hash and the size

Below an exemple :

{
    "type": "syslog",
    "device_product": "C190 Email Security Appliance",
    "AMPVerdict": "UNKNOWN",
    "AttachmentDetails": "{\"image.png\": {\"AMP\": {\"Verdict\": [\"FILE UNKNOWN\", \"FILE UNKNOWN\", \"FILE UNKNOWN\", \"FILE UNKNOWN\", \"FILE UNKNOWN\"], \"fileHash\": [\"cfb8d81191be809c15ff909da67a8645f234c8659907b7983ea81\", \"99aec7300ba8816c3ff56db24b6ca9b177fb8d9ed31846a8912d\"]}, \"BodyScanner\": {}}}",
    "@timestamp": "2024-03-10T10:15:15.000Z",
    "dvc": "192.168....",
    "event_severity": "5",
    "host": "localhost",
    "CFVerdict": "NO_MATCH",
    "event_class_id": "ESA_CONSOLIDATED_LOG_EVENT",
    "@version": "1",
    "SPFVerdict": "{\"mailfrom\": {\"result\": \"Pass\", \"sender\": \"bounce@mailing.sender.localhost\"}}",
    "AVVerdict": "NEGATIVE",
    "event_name": "Consolidated Log Event"   
}

Any idea to parse this fields please ?

Thanks

You can use a json filter

    json { source => "SPFVerdict" remove_field => [ "SPFVerdict" ] }
    json { source => "AttachmentDetails" remove_field => [ "AttachmentDetails" ] }

@Badger thanks for your answer

I used json filter also but, it's not the result i want ;
With json filter on AttachmentDetails :

{ 
AttachmentDetails
{"image.png": {"AMP": {"Verdict": "FILE UNKNOWN", "fileHash": "85923b8580223376e30ac790ad36f36"}, "BodyScanner": {}}}
}

=>

{
image_png
{"AMP":{"fileHash":"85e1223376e30ac790ad36f36","Verdict":"FILE UNKNOWN"},"BodyScanner":{}}
}

Also, the field is not fix, we can have a pdf file or something else

I want extract Verdict and fileHash

If you parse the JSON then you get

     "image.png" => {
            "AMP" => {
        "fileHash" => [
            [0] "cfb8d81191be809c15ff9096a71eccf932da67a8645f234c8659907b7983ea81",
            [1] "99aec7300ba8816c3ff5df94a1dc18fb6db24b6ca9b177fb8d9ed31846a8912d"
        ],
         "Verdict" => [
            [0] "FILE UNKNOWN",
            [1] "FILE UNKNOWN",
            [2] "FILE UNKNOWN",
            [3] "FILE UNKNOWN",
            [4] "FILE UNKNOWN"
        ]
    },

How do you want that restructured?

Below an exemple of the result that I want :

To get this result I used a Ruby filter, I tested it with Logstash, but doesn't work, and with ruby code, I got the correct result (screenshot)

Ruby code VSCode

attachmentDetails = {
    "test.txt" => {
        "AMP" => {
            "Verdict" => "FILE UNKNOWN",
            "fileHash" => "7f843d263304fb0516d63ee7051f7b785cfa"
            },
            "BodyScanner" => {
            "fsize" => 10059
        }
    },
    "image01.png" => {
        "AMP" => {
            "Verdict" => "FILE SAFE",
            "fileHash" => "a2b3c4d5e6f7g8h9i0j1k2l3m4n5o6p7"
            },
            "BodyScanner" => {
            "fsize" => 2048
        }
    }
}

if attachmentDetails.is_a?(Hash)
  new_fields = {}
  index = 1

 
  attachmentDetails.each do |file_name, file_details|
    if file_details["AMP"]
      new_fields["fileHash_#{index}"] = file_details["AMP"]["fileHash"]
      new_fields["Verdict_#{index}"] = file_details["AMP"]["Verdict"]
    end
    if file_details["BodyScanner"]
      new_fields["fsize_#{index}"] = file_details["BodyScanner"]["fsize"]
    end
    index += 1
  end

  puts "Before processing :"
  puts attachmentDetails
  puts "after processing:"
  puts new_fields
else
  puts "AttachmentDetails is not a Hash"
end

Ruby filter logstash :

if [AttachmentDetails] {
             
            ruby {
                code => '
                    if event.get("AttachmentDetails").is_a?(Hash) 
                        new_fields = {}
                        index = 1  
                        event.get("AttachmentDetails").each do |file_name, file_details|  
                            if file_details["AMP"]
                                new_fields["fileHash_#{index}"] = file_details["AMP"]["fileHash"]
                                new_fields["Verdict_#{index}"] = file_details["AMP"]["Verdict"]
                            end
                            if file_details["BodyScanner"]
                                new_fields["fsize_#{index}"] = file_details["BodyScanner"]["fsize"]
                            end
                            index += 1
                        end
                        event.set("attachment_details", new_fields)
                    end
                '
            }
        }

Thanks for your answers :slight_smile:

Field names are case sensitive :slight_smile: If you fix that it seems to work

"attachment_details" => {
    "fileHash_1" => "7f843d263304fb0516d63ee7051f7b785cfa",
    "fileHash_2" => "a2b3c4d5e6f7g8h9i0j1k2l3m4n5o6p7",
       "fsize_1" => 10059,
     "Verdict_1" => "FILE UNKNOWN",
     "Verdict_2" => "FILE SAFE",
       "fsize_2" => 2048
},

I found the solution, it's not the field name that is at fault, in the logstash filter it's AttachmentDetails and ruby code it's attachmentDetails to store the json value.

The issue was that Logstash considers the field attachmentdetails as a string that is not a valid json, which explains why the ruby filter did not work, I added a section to parse the field with json filter before processing and it works.

if [AttachmentDetails] {
            ruby {
                code => '
                    require "json"

                    file_name = []
                    file_hash = []
                    file_verdict = []
                    file_size = []

                    attachment_details = event.get("AttachmentDetails")

                    if attachment_details
                        if attachment_details.is_a?(String)
                            attachment_details = JSON.parse(attachment_details)
                        end

                        attachment_details.each do |filename, file_details|
                            file_name << filename

                            if file_details && file_details["AMP"]
                                file_hash << file_details["AMP"]["fileHash"]
                                file_verdict << file_details["AMP"]["Verdict"]
                            else
                                file_hash << nil
                                file_verdict << nil
                            end

                            if file_details && file_details["BodyScanner"]
                                file_size << file_details["BodyScanner"]["fsize"]
                            else
                                file_size << nil
                            end
                        end
                    end

                    event.set("file_name", file_name)
                    event.set("file_hash", file_hash)
                    event.set("file_verdict", file_verdict)
                    event.set("file_size", file_size)

                    event.remove("AttachmentDetails")
                '
            }
        }

Thanks @Badger :slight_smile: