Parse Mcafee audit log file with Logstash

Hi Folks ,

I need your help to find a right way to parse this non-structured file

________________________________________________________________________________
Timestamp  : 19/Apr/2022:05:50:02.771 +0200
User       : test
Action     : USER_LOGIN
Source Type: USER
Source ID  : 1.1.1.1
Appliance  : TEST_DEVICE
Details:
   User-Agent: Java/1.8.0_272
   Role    : Test L2 Admin
________________________________________________________________________________
Timestamp  : 19/Apr/2022:05:50:02.771 +0200
User       : test_23
Action     : USER_LOGIN
Source Type: USER
Source ID  : 1.1.1.2
Appliance  : TEST_DEVICE
Details:
   User-Agent: Java/1.8.0_272
   Role    : Test L3 Admin

Expected result

Timestamp  => 19/Apr/2022:05:50:02.771 +0200
User       => test_23
Action     => USER_LOGIN
Source Type=> USER
Source ID  => 1.1.1.2
Appliance  => TEST_DEVICE
User-Agent=> Java/1.8.0_272
Role    => Test L3 Admin

regards

The events are delimited by a line of underscores, so you could use a multiline codec to combine all the lines for a single event.

file {
    path => "/home/user/foo.txt"
    sincedb_path => "/dev/null"
    start_position => beginning
    codec => multiline {
        pattern => "_______"
        negate => true
        what => previous
        auto_flush_interval => 2
        multiline_tag => ""
    }
}

Then use grok

        grok {
        break_on_match => false
        match => {
            "message" => [
                "Timestamp%{SPACE}:%{SPACE}(?<Timestamp>[^\n]+)\n",
                "User%{SPACE}:%{SPACE}(?<User>[^\n]+)\n",
                "Action%{SPACE}:%{SPACE}(?<Action>[^\n]+)\n",
                "Source Type%{SPACE}:%{SPACE}(?<Source Type>[^\n]+)\n",
                "Source ID%{SPACE}:%{SPACE}(?<Source ID>[^\n]+)\n",
                "Appliance%{SPACE}:%{SPACE}(?<Appliance>[^\n]+)\n",
                "User-Agent%{SPACE}:%{SPACE}(?<User-Agent>[^\n]+)\n",
                "Role%{SPACE}:%{SPACE}(?<Role>[^\n]+)\n"
            ]
        }
    }

I would recommend against having spaces in your field names. Use [SourceType] rather than [Source Type].

Thanks Badger for your help , find below my logstash pipeline . I'm still having some issue . Logs is not parsed , I need in the logs that this pipieline is running but I dont see any index created

input {
file {
    path => "/so/data/backups/test/audit*.log"
    sincedb_path => "/dev/null"
    start_position => "beginning"
    codec => multiline {
        pattern => "_______"
        negate => true
        what => previous
        auto_flush_interval => 2
        multiline_tag => "MCP-AUDIT-LOG"
    }
}
}
filter {
        grok {
        break_on_match => false
        match => {
            "message" => [
                "Timestamp%{SPACE}:%{SPACE}(?<Timestamp>[^\n]+)\n",
                "User%{SPACE}:%{SPACE}(?<User>[^\n]+)\n",
                "Action%{SPACE}:%{SPACE}(?<Action>[^\n]+)\n",
                "Source Type%{SPACE}:%{SPACE}(?<Source_Type>[^\n]+)\n",
                "Source ID%{SPACE}:%{SPACE}(?<Source_ID>[^\n]+)\n",
                "Appliance%{SPACE}:%{SPACE}(?<Appliance>[^\n]+)\n",
                "User-Agent%{SPACE}:%{SPACE}(?<User-Agent>[^\n]+)\n",
                "Role%{SPACE}:%{SPACE}(?<Role>[^\n]+)\n"
            ]
        }
    }
}
output {
    elasticsearch {
              index => "mcp-audit-00001"
              hosts => ["https://es-node-01:9200"]
              ssl => true
              ssl_certificate_verification => true
              cacert => "/etc/logstash/elasticsearch-ca.pem"
              manage_template => true
              user => "elastic"
              password => ''
              codec => "plain"
                    }
stdout { codec => rubydebug }
}

I've also identified some extra field in the logs file .

Timestamp  : 19/Apr/2022:21:17:58.117 +0200
User       : test.user
Action     : DELETED_CONTENT
Source Name: LSP Profile groups
Source Type: LIST<String>
Source ID  : com.scur.type.string.19546
Source Path: /Lists/String/
Appliance  : PRATEST
Details:
   Entry   : xls\www_standard_test2
   Position: 8
   Entry   : xls\www_standard_test
   Position: 7
   Entry   : xls\www_avance_test2
   Position: 5
   Entry   : xls\www_avance_test
   Position: 4

From my perspective it's will be great to create each entry field like below

Entry_8 => xls\www_standard_test2
Entry_7 => xls\www_standard_test
......

Add another grok pattern and a ruby filter

                "Role%{SPACE}:%{SPACE}(?<Role>[^\n]+)\n",
                "Details:%{GREEDYDATA:[@metadata][Details]}"
            ]
        }
    }
    ruby {
        code => '
            details = event.get("[@metadata][Details]")
            if details
                matches = details.scan(/Entry\s+: ([^\n]+)\n\s+Position: (\d+)/)
                # matches is an array of arrays containing the two
                # capture groups from the regexp we scanned for:
                # [["xls\\www_standard_test2", "8"], ... ]
                matches.each { |x|
                    event.set("Entry_#{x[1]}", x[0])
                }
            end
        '
    }

Note that in current versions the index option on an Elasticsearch output is ignored -- ILM takes over.

Thanks to your help.

I have tried :

input{
 tcp {
  port => 6514
  }
}

filter {
 grok {
  break_on_match => false
  match => {
   "message" => [
    "Nom du compte%{SPACE}:%{SPACE}(?<Nom_Compte>)%{SPACE}"
    "Domaine du compte%{SPACE}(?<Domaine_Compte>)%{SPACE}"
    ]
    }
   }
}

output {
   elasticsearch { hosts => ["localhost:9200]}

But that doesn't work.
I have an error"_grokparsefailure" in my logs.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.