I used a combination of grok
to grab the timestamp from the message and the main payload, which is JSON formatted.
input {
file {
path => "PATH/TO/honeysap.log"
codec => plain {
charset => "ISO-8859-1"
}
type => "honeysap"
start_position => beginning
sincedb_path => "/dev/null"
}
}
filter {
# Get rid of the logfile path if it isn't needed.
mutate {
remove_field => [ "path" ]
}
# Extract the timestamp and JSON payload. If successful remove the original message.
grok {
match => {
"[message]" => "honeysap\.events\.logfeed%{SPACE}-%{SPACE}(?<datetime>20[0-3][0-9]-[0-1][0-9]-[0-3][0-9] [0-2][0-9]:[0-6][0-9]:[0-6][0-9],[0-9][0-9][0-9])%{SPACE}-%{SPACE}EVENT%{SPACE}-[^\{]+%{GREEDYDATA:payload}"
}
remove_field => [ "[message]" ]
}
# If the grok was successful process the timestamp and JSON payload.
if "_grokparsefailure" not in [tags] {
# Transform the JSON payload into event fields. If successful remove the payload field.
json {
skip_on_invalid_json => true
source => "[payload]"
remove_field => [ "[payload]" ]
}
# Set @timestamp based on datetime (the timestamp from the message). If successful remove the datetime field. Set the timezone as needed.
date {
match => [ "[datetime]", "YYYY-MM-dd HH:mm:ss,SSS" ]
remove_field => [ "[datetime]" ]
timezone => "UTC"
}
}
}
output {
stdout {
codec => rubydebug { }
}
}
The resulting output is...
{
"@timestamp" => 2020-03-15T05:13:50.979Z,
"type" => "honeysap",
"target_port" => 3299,
"service" => "saprouter",
"source_port" => 52183,
"source_ip" => "192.168.5.76",
"response" => "",
"target_ip" => "0.0.0.0",
"timestamp" => "2020-03-15 05:13:50.978092",
"host" => "ws5",
"request" => "AAAAQE5JX1JPVVRFAAIoAgAAAAEAAAAoAAAAFDE5Mi4xNjguNS4xMjcAMzI5OQAAMTAuMC4wLjEwMABzYXBkcDAxAAA=",
"@version" => "1",
"event" => "Received packet",
"data" => "",
"session" => "4ba57f1d-b5ad-4ebf-9ef9-35f9e8860723"
}
Since the JSON payload also includes a timestamp
field you could also do something more simple like...
filter {
# Get rid of the logfile path if it isn't needed. And remove the non-JSON log prefix.
mutate {
remove_field => [ "path" ]
gsub => [ "[message]", "^[^\{]+", "" ]
}
json {
skip_on_invalid_json => true
source => "[message]"
remove_field => [ "[message]" ]
}
# Set @timestamp based on the timestamp field from the JSON payload. If successful remove the datetime field. Set the timezone as needed.
date {
match => [ "[timestamp]", "ISO8601" ]
remove_field => [ "[timestamp]" ]
timezone => "UTC"
}
}
This produces a similar output, without the extra timestamp...
{
"type" => "honeysap",
"@version" => "1",
"target_ip" => "0.0.0.0",
"event" => "Received packet",
"data" => "",
"response" => "",
"session" => "4ba57f1d-b5ad-4ebf-9ef9-35f9e8860723",
"@timestamp" => 2020-03-15T05:13:50.979Z,
"service" => "saprouter",
"request" => "AAAAQE5JX1JPVVRFAAIoAgAAAAEAAAAoAAAAFDE5Mi4xNjguNS4xMjcAMzI5OQAAMTAuMC4wLjEwMABzYXBkcDAxAAA=",
"source_port" => 52183,
"host" => "ws5",
"timestamp" => "2020-03-15 05:13:50.978092",
"source_ip" => "192.168.5.76",
"target_port" => 3299
}
In some environments multiple timestamps are desired in order to record the time the event occurred, the time it was logged by the observing system, and the time it was received by the central log system. In ECS these are @timestamp
, event.created
, and event.ingested
respectively.
You could further process the event data from this point in order to make it ECS compliant, which would allow use to use the SIEM app in Kibana.
Rob
How to install Elasticsearch & Kibana on Ubuntu - incl. hardware recommendations
What is the best storage technology for Elasticsearch?