What is the best way to write grok pattern for below Log

Hi Folks,

Below is the line that I am trying to write a GROK pattern however later part starts with JSON and my codec => json is not matching due to the first part.

Any idea or clue how do I match the rules or do I need to write complete grok pattern for below log entries?

honeysap.events.logfeed - 2020-03-15 05:13:50,979 - EVENT    - {"timestamp": "2020-03-15 05:13:50.978092", "request": "AAAAQE5JX1JPVVRFAAIoAgAAAAEAAAAoAAAAFDE5Mi4xNjguNS4xMjcAMzI5OQAAMTAuMC4wLjEwMABzYXBkcDAxAAA=", "session": "4ba57f1d-b5ad-4ebf-9ef9-35f9e8860723", "target_port": 3299, "target_ip": "0.0.0.0", "data": "", "event": "Received packet", "service": "saprouter", "source_ip": "192.168.5.76", "response": "", "source_port": 52183}

My KV is working fine when only those inside braces entries are parased

input {
        stdin {
        codec => json
        }
}

filter {
        kv { }
}
output {
stdout {}
}

I used a combination of grok to grab the timestamp from the message and the main payload, which is JSON formatted.

input {
  file {
    path => "PATH/TO/honeysap.log"
    codec => plain {
      charset => "ISO-8859-1"
    }
    type => "honeysap"
    start_position => beginning
    sincedb_path => "/dev/null"
  }
}

filter {
  # Get rid of the logfile path if it isn't needed.
  mutate {
    remove_field => [ "path" ]
  }

  # Extract the timestamp and JSON payload. If successful remove the original message.
  grok {
    match => {
      "[message]" => "honeysap\.events\.logfeed%{SPACE}-%{SPACE}(?<datetime>20[0-3][0-9]-[0-1][0-9]-[0-3][0-9] [0-2][0-9]:[0-6][0-9]:[0-6][0-9],[0-9][0-9][0-9])%{SPACE}-%{SPACE}EVENT%{SPACE}-[^\{]+%{GREEDYDATA:payload}"
    }
    remove_field => [ "[message]" ]
  }

  # If the grok was successful process the timestamp and JSON payload.
  if "_grokparsefailure" not in [tags] {
    # Transform the JSON payload into event fields. If successful remove the payload field.
    json {
      skip_on_invalid_json => true
      source => "[payload]"
      remove_field => [ "[payload]" ]
    }
    
    # Set @timestamp based on datetime (the timestamp from the message). If successful remove the datetime field. Set the timezone as needed.
    date {
      match => [ "[datetime]", "YYYY-MM-dd HH:mm:ss,SSS" ]
      remove_field => [ "[datetime]" ]
      timezone => "UTC"
    }
  }
}

output {
  stdout {
    codec => rubydebug { }
  }
}

The resulting output is...

{
     "@timestamp" => 2020-03-15T05:13:50.979Z,
           "type" => "honeysap",
    "target_port" => 3299,
        "service" => "saprouter",
    "source_port" => 52183,
      "source_ip" => "192.168.5.76",
       "response" => "",
      "target_ip" => "0.0.0.0",
      "timestamp" => "2020-03-15 05:13:50.978092",
           "host" => "ws5",
        "request" => "AAAAQE5JX1JPVVRFAAIoAgAAAAEAAAAoAAAAFDE5Mi4xNjguNS4xMjcAMzI5OQAAMTAuMC4wLjEwMABzYXBkcDAxAAA=",
       "@version" => "1",
          "event" => "Received packet",
           "data" => "",
        "session" => "4ba57f1d-b5ad-4ebf-9ef9-35f9e8860723"
}

Since the JSON payload also includes a timestamp field you could also do something more simple like...

filter {
  # Get rid of the logfile path if it isn't needed. And remove the non-JSON log prefix.
  mutate {
    remove_field => [ "path" ]
    gsub => [ "[message]", "^[^\{]+", "" ]
  }

  json {
    skip_on_invalid_json => true
    source => "[message]"
    remove_field => [ "[message]" ]
  }
  
  # Set @timestamp based on the timestamp field from the JSON payload. If successful remove the datetime field. Set the timezone as needed.
  date {
    match => [ "[timestamp]", "ISO8601" ]
    remove_field => [ "[timestamp]" ]
    timezone => "UTC"
  }
}

This produces a similar output, without the extra timestamp...

{
           "type" => "honeysap",
       "@version" => "1",
      "target_ip" => "0.0.0.0",
          "event" => "Received packet",
           "data" => "",
       "response" => "",
        "session" => "4ba57f1d-b5ad-4ebf-9ef9-35f9e8860723",
     "@timestamp" => 2020-03-15T05:13:50.979Z,
        "service" => "saprouter",
        "request" => "AAAAQE5JX1JPVVRFAAIoAgAAAAEAAAAoAAAAFDE5Mi4xNjguNS4xMjcAMzI5OQAAMTAuMC4wLjEwMABzYXBkcDAxAAA=",
    "source_port" => 52183,
           "host" => "ws5",
      "timestamp" => "2020-03-15 05:13:50.978092",
      "source_ip" => "192.168.5.76",
    "target_port" => 3299
}

In some environments multiple timestamps are desired in order to record the time the event occurred, the time it was logged by the observing system, and the time it was received by the central log system. In ECS these are @timestamp, event.created, and event.ingested respectively.

You could further process the event data from this point in order to make it ECS compliant, which would allow use to use the SIEM app in Kibana.

Rob

GitHub YouTube LinkedIn
How to install Elasticsearch & Kibana on Ubuntu - incl. hardware recommendations
What is the best storage technology for Elasticsearch?

Awesome man thanks a lot and that did work!!

Thanks again.

Another option would be to use dissect rather than grok. Like this.

Hi,

Somehow using those parsers I am unable to index my data in elastic search any idea why?

[WARN ] 2020-03-18 18:48:35.675 [[main]>worker3] elasticsearch - Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"problox-2020.03.18", :_type=>"_doc", :routing=>nil}, #<LogStash::Event:0x2da962da>], :response=>{"index"=>{"_index"=>"problox-2020.03.18", "_type"=>"_doc", "_id"=>"-tPL7XABjMrTL_9AtrF2", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [timestamp] of type [date] in document with id '-tPL7XABjMrTL_9AtrF2'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [2020-03-18 07:11:22.911641] with format [strict_date_optional_time||epoch_millis]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Failed to parse with all enclosed parsers"}}}}}}
````
here is my final file

````
input {
# SAP Trap Internal
#        file {
#   path => ["/var/log/data/saphp/honeysap-internal.log"]
#    codec => json
#    type => "saptrapinternal"
#        }

# SAP Trap External
#        file {
#    path => ["/var/log/data/saphp/honeysap-external.log"]
#    codec => json
#    type => "saptrapexternal"
#        }
#}
        stdin {
        codec => json
        type => "saptrapexternal"
        }
}
filter {
#SAP Internal
        if [type] == "saptrapinternal"{
          mutate {
    remove_field => [ "path" ]
    gsub => [ "[message]", "^[^\{]+", "" ]
                rename => {
                "source_ip" => "src_ip"
                "target_port" => "dest_port"
                "source_port" => "src_port"
                }
  }
  json {
    skip_on_invalid_json => true
    source => "[message]"
    remove_field => [ "[message]" ]
  }
  # Set @timestamp based on the timestamp field from the JSON payload. If successful remove the datetime field. Set the timezone as needed.
  date {
    match => [ "[timestamp]", "ISO8601" ]
    remove_field => [ "[timestamp]" ]
    timezone => "UTC"
  }
}
############
        if [type] == "saptrapexternal" {
                  mutate {
    remove_field => [ "path" ]
    gsub => [ "[message]", "^[^\{]+", "" ]
                        rename => {
                "source_ip" => "src_ip"
                "target_port" => "dest_port"
                "source_port" => "src_port"
                }
  }
  json {
    skip_on_invalid_json => "true"
    source => "[message]"
    remove_field => [ "[message]" ]
  }
# mutate {
#    remove_field => [ "path" ]
#    gsub => [ "[message]", "^[^\{]+", "" ]
#                        rename => {
#                "source_ip" => "src_ip"
#                "target_port" => "dest_port"
#                "source_port" => "src_port"
#                }
#  }
  # Set @timestamp based on the timestamp field from the JSON payload. If successful remove the datetime field. Set the timezone as needed.
  date {
    match => [ "[timestamp]", "ISO8601" ]
    remove_field => [ "[timestamp]" ]
    timezone => "UTC"
        }
}
#### SAP Trap
if [type] == "saptrapinternal" {
        mutate {
                add_field => {
                        trap_type => "SAP-iNTERNAL"
                        }
                }
                        }

if [type] == "saptrapexternal" {
        mutate {
               add_field => {
                        trap_type => "SAP-Router"
                        }
                }
                        }
}
output {
  elasticsearch {
    hosts => ["https://127.0.0.1:16577"]
    user => xxxx
    password => XXXX
    ssl => true
    ssl_certificate_verification => false
    template => "/etc/logstash/elasticsearch-template-es7x.json"
    ilm_enabled => false
    index => "problox-%{+YYYY.MM.dd}"
#    document_type => "doc"
  }

  #if [type] == "Suricata" {
  #    file {
  #      file_mode => 0770
  #      path => "/data/suricata/log/suricata_ews.log"
  #    }
  #}
  # Debug output
  #if [type] == "XYZ" {
  #  stdout {
  #    codec => rubydebug
  #  }
  #}
  # Debug output
  #stdout {
  #  codec => rubydebug
  #}

}

#output {stdout {}}

And my mutate filter is now working as well

The strict_date_optional_time format only allows millisecond precision. If you need to retain microsecond precision you could use an index template with a custom format, or if milliseconds is OK use a date filter to parse that field before feeding it to elasticsearch (or mutate+gsub to throw away the last three characters of the field).

hmmm ..any example would be really appreciated. And do you mean cut out this 911641?

If you want an example of a custom format I suggest you ask in the elasticsearch forum. I don't run elasticsearch.

If you want to discard the last three characters you can use

mutate { gsub => [ "timestamp", "...$", "" ] }