Hi everyone,
I'm new to ELK stack and I've been stuck for two days with a problem that I think is due to kv plugin.
I need to ingest log from our Fortimail. This is one sample of my Fortimail log :
<42>date=2019-05-22 time=16:29:16 device_id=0000000000101010 log_id=0200017726 type=statistics pri=information session_id="x8MdThfh6721-xTddfrEDdEZ0177" client_name="[0.0.0.0]" dst_ip="0.0.0.0" from="myemail@domain.us" to="recipient@domain.us" polid="21:0:1" domain="domain.us" subject="test" mailer="mta" resolved="FAIL" direction="in" virus="" disposition="Quarantine to Review" classifier="FortiGuard AntiSpam-IP" detail='Bulk' message_length="10342"
So to parse this message I actually use Grok and KV plugin.
This is my test configuration :
input { stdin { } }
filter {
grok {
#break_on_match => false
match => [ "message", "%{SYSLOG5424PRI:syslog_index}%{GREEDYDATA:message}" ]
overwrite => [ "message" ]
}
kv {
source => "message"
field_split => " "
value_split => "="
}
}
output { stdout { codec => rubydebug } }
Then I use this command to test the configuration :
cat test.txt | sudo -u logstash /usr/share/logstash/bin/logstash -f 06-test.conf
{
"syslog5424_pri" => "42",
"disposition" => "Quarantine to Review",
"dst_ip" => "0.0.0.0",
"detail" => "Bulk",
"mailer" => "mta",
"polid" => "21:0:1",
"client_name" => "[0.0.0.0]",
"domain" => "domain.us",
"date" => "2019-05-22",
"message" => "date=2019-05-22 time=16:29:16 device_id=0000000000101010 log_id=0200017726 type=statistics pri=information session_id=\"x8MdThfh6721-xTddfrEDdEZ0177\" client_name=\"[0.0.0.0]\" dst_ip=\"0.0.0.0\" from=\"myemail@domain.us\" to=\"recipient@domain.us\" polid=\"21:0:1\" domain=\"domain.us\" subject=\"test\" mailer=\"mta\" resolved=\"FAIL\" direction=\"in\" virus=\"\" disposition=\"Quarantine to Review\" classifier=\"FortiGuard AntiSpam-IP\" detail='Bulk' message_length=\"10342\"",
"from" => "myemail@domain.us",
"classifier" => "FortiGuard AntiSpam-IP",
"device_id" => "0000000000101010",
"log_id" => "0200017726",
"message_length" => "10342",
"subject" => "test",
"direction" => "in",
"resolved" => "FAIL",
"host" => "elk-dev",
"pri" => "information",
"@version" => "1",
"session_id" => "x8MdThfh6721-xTddfrEDdEZ0177",
"@timestamp" => 2019-05-23T07:41:54.012Z,
"type" => "statistics",
"to" => "recipient@domain.us",
"syslog_index" => "<42>",
"time" => "16:29:16"
}
So everythink looks fine for me, it works.
Now my problem is then I set up this configuration in production mode. It seems that my elasticsearch indice is empty. Logstash doesn't send log to elasticsearch.
I verified Logstash log but everything seems to be working, I see no errors. And I think the problem is because of the kv plugin. Because if I remove the KV configuration in the filter, all my Fortimail log are send to elasticsearch and I can visualize them in Kibana.
This is my production configuration :
input {
udp {
port => 12345
type => "syslog"
id => "fortimail_syslog_udp"
}
tcp {
port => 12345
type => "syslog"
id => "fortimail_syslog_tcp"
}
}
filter {
if [type] == "syslog" {
grok {
#break_on_match => false
match => [ "message", "%{SYSLOG5424PRI:syslog_index}%{GREEDYDATA:message}" ]
overwrite => [ "message" ]
}
kv {
source => "message"
field_split => " "
value_split => "="
}
}
}
output {
if [type] == "syslog" {
elasticsearch {
hosts => ["localhost:9200"]
index => "syslog-%{+dd.MM.YYYY}"
}
}
}
So I'm really lost, and I don't know where is the problem...
I will really appreciate your help if you have already encounter this problem.
For information, my ELK stack is in version 7.0 and I update all logstash plugins (so KV is in version 4.3.1)
Have a good day,
Thanks in advance.