Sending text log files using Logstash to Kafka

Hi!
I have a question about correct setup of logstash in order to be able to send plain text log files in json format to Kafka clusters.

In my logstash.conf I have:
input {
file {
path => "/app/server/default/logs/audit.log"
codec => "json"
tags => ["server_one_tag1"]
}
}

filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:EventTime}%{SPACE}%{WORD:TimeZone}%{SPACE}(%{WORD:MWSLogType}:%{WORD:Severity})%{SPACE}%{SPACE}%-%{SPACE}%{GREEDYDATA:LogMessage}" }
add_tag => ["server_one_tag2"]
}
}

output {
kafka {
bootstrap_servers => "kafka.clusters.com:9092"
topic_id => ["XYZPREPROD"]
}
}

Line from my logfile looks like which is being taken by logstash:
2018-03-08 19:21:01 CET (Audit:INFO) - DeleteEvent; timestamp=1520527979786; username=system/someuser; operation=Item deleted; status=success; relation_is_from=true; is_deliverable=true; remove=/meta/default/task/03000027472; deleted=/meta/default/task/00000327472

After I start logstash in debug, following in output:
[2018-07-03T10:19:13,101][DEBUG][logstash.inputs.file ] each: file grew: /app/server/default/logs/audit.log: old size 10292413, new size 10292575
[2018-07-03T10:19:13,105][DEBUG][logstash.inputs.file ] Received line {:path=>"/app/server/default/logs/audit.log", :text=>"2018-07-03 10:19:12 CEST (Audit:INFO) - LoginFailedEvent; ip-address=12.34.56.789; timestamp=1530605952100; failed_login_user=Administrator; action=Failed Login"}
[2018-07-03T10:19:13,122][ERROR][logstash.codecs.json ] JSON parse error, original data now in message field {:error=>#<LogStash::Json::ParserError: Unexpected character ('-' (code 45)): Expected space separating root-level values
at [Source: (String)"2018-07-03 10:19:12 CEST (Audit:INFO) - LoginFailedEvent; ip-address=10.30.58.139; timestamp=1530605952100; failed_login_user=Administrator; action=Failed Login"; line: 1, column: 6]>, :data=>"2018-07-03 10:19:12 CEST (Audit:INFO) - LoginFailedEvent; ip-address=12.34.56.789; timestamp=1530605952100; failed_login_user=Administrator; action=Failed Login"}
[2018-07-03T10:19:13,124][DEBUG][logstash.util.decorators ] inputs/LogStash::Inputs::File: adding tag {"tag"=>"server_one_tag1"}
[2018-07-03T10:19:13,125][DEBUG][logstash.inputs.file ] writing sincedb (delta since last write = 83)
[2018-07-03T10:19:13,240][DEBUG][logstash.pipeline ] filter received {"event"=>{"@version"=>"1", "host"=>"prisma", "message"=>"2018-07-03 10:19:12 CEST (Audit:INFO) - LoginFailedEvent; ip-address=12.34.56.789; timestamp=1530605952100; failed_login_user=Administrator; action=Failed Login", "@timestamp"=>2018-07-03T08:19:13.123Z, "tags"=>["_jsonparsefailure", "server_one_tag1"], "path"=>"/app/server/default/logs/audit.log"}}
[2018-07-03T10:19:13,242][DEBUG][logstash.filters.grok ] Running grok filter {:event=>#LogStash::Event:0x5104ad89}
[2018-07-03T10:19:13,243][DEBUG][logstash.filters.grok ] Event now: {:event=>#LogStash::Event:0x5104ad89}
[2018-07-03T10:19:13,244][DEBUG][logstash.pipeline ] output received {"event"=>{"@version"=>"1", "host"=>"prisma", "message"=>"2018-07-03 10:19:12 CEST (Audit:INFO) - LoginFailedEvent; ip-address=12.34.56.789; timestamp=1530605952100; failed_login_user=Administrator; action=Failed Login", "@timestamp"=>2018-07-03T08:19:13.123Z, "tags"=>["_jsonparsefailure", "server_one_tag1", "_grokparsefailure"], "path"=>"/app/server/default/logs/audit.log"}}

Outout in Kibana looks like:


So what I want to achieve:
In Kibana, instead of "_jsonparsefailure", I want to see fields same way as I divided them in grok section. Also, I want to add some tags, like "cluster1" etc as I have several applications that will write under same topic in Kafka. But nevertheless I add "tags", they never appear in Kibana.

There's something fishy going on here. Are you sure this is the configuration you're actually using? You have tags => ["server_one_tag1"] in your input yet that tag is never present when the event reaches Elasticsearch. There's also no sign that the grok filter gets any opportunity to process your events.

Unrelated to the problem I reason about above, but remove codec => json from your file input. The input file is clearly not JSON.

yes, maybe it is fishy.
so how can i understand why grok is not processing my events? also, what i want to achieve - my output should be in json format, so kafka can receive my events correctly.

so how can i understand why grok is not processing my events

Simplify your setup. Are you 100% sure Logstash is using the configuration you think it's using? Starting Logstash with debug-level logging makes it log the loaded configuration. Skip Kafka for now and just use a simple stdout { codec => rubydebug } output to rule out one potential error source.

OK, I commented kafka lines, and put rubedebug there. Many thanks for suggestion.
As I can see smth is going wrong there:

[2018-07-03T13:56:27,879][ERROR][logstash.codecs.json ] JSON parse error, original data now in message field {:error=>#<LogStash::Json::ParserError: Unexpected character ('-' (code 45)): Expected space separating root-level values
at [Source: (String)"2018-07-03 13:56:27 CEST (Audit:INFO) - LoginEvent; ip-address=12.34.558.789; timestamp=1530618987526; username=system/administrator; action=Logged In"; line: 1, column: 6]>, :data=>"2018-07-03 13:56:27 CEST (Audit:INFO) - LoginEvent; ip-address=10.30.58.139; timestamp=1530618987526; username=system/administrator; action=Logged In"}
{
"path" => "/app/server/default/logs/audit.log",
"tags" => [
[0] "_jsonparsefailure",
[1] "server_one_tag1",
[2] "_grokparsefailure"
],
"@timestamp" => 2018-07-03T11:56:27.891Z,
"message" => "2018-07-03 13:56:27 CEST (Audit:INFO) - LoginEvent; ip-address=10.30.58.139; timestamp=1530618987526; username=system/administrator; action=Logged In",
"host" => "prisma",
"@version" => "1"
}

As I said, remove codec => json from your file input. Then look into why your grok expression isn't matching the input.

I have already removed json, for now I am not able to understand why my grok is not being applied. As you suggested in other threads, I applied one grok expression.
My config:

input {
file {
path => "/app/server/default/logs/audit.log"
tags => ["234","server_one_tag1"]

filter {
if [tag] == 'server_one_tag1'{
}}
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:EventTime}" }
add_tag => ["server_one_tag2"]
remove_field => ["message"]
add_field => ["test_field","%{EventTime}"]
}}
}
output {
stdout { codec => rubydebug }
}

Output is following:
[2018-07-04T08:36:05,190][DEBUG][logstash.pipeline ] filter received {"event"=>{"message"=>"2018-07-04 08:36:04 CEST (Audit:INFO) - LoginEvent; ip-address=12.345.67.890; timestamp=1530686164820; username=LDAP/chanmend; action=Logged In", "host"=>"prisma", "@version"=>"1", "path"=>"/app/server/default/logs/audit.log", "tags"=>["234", "server_one_tag1"], "@timestamp"=>2018-07-04T06:36:05.086Z}}
[2018-07-04T08:36:05,190][DEBUG][logstash.pipeline ] output received {"event"=>{"message"=>"2018-07-04 08:36:04 CEST (Audit:INFO) - LoginEvent; ip-address=12.345.67.890; timestamp=1530686164820; username=LDAP/chanmend; action=Logged In", "host"=>"prisma", "@version"=>"1", "path"=>"/app/server/default/logs/audit.log", "tags"=>["234", "server_one_tag1"], "@timestamp"=>2018-07-04T06:36:05.086Z}}
{
"message" => "2018-07-04 08:36:04 CEST (Audit:INFO) - LoginEvent; ip-address=12.345.67.890; timestamp=1530686164820; username=LDAP/chanmend; action=Logged In",
"host" => "prisma",
"@version" => "1",
"path" => "/app/server/default/logs/audit.log",
"tags" => [
[0] "234",
[1] "server_one_tag1"
],
"@timestamp" => 2018-07-04T06:36:05.086Z
}

OK, finally I was able to figure out. There are no errors etc anymore.

This is again my string from text log files which is being sent to Kibana, where JSON format represents it perfectly:
2018-03-08 19:21:01 CET (Audit:INFO) - DeleteEvent; timestamp=1520527979786; username=system/someuser; operation=Item deleted; status=success; relation_is_from=true; is_deliverable=true; remove=/meta/default/task/00w0027472; deleted=/meta/default/task/00b0027472

So correct config would be (this perfectly works with json and default setup of logstash itself):

input {
file {
path => "/app/server/default/logs/audit.log"
tags => ["xxx","yyy"]
type => "mws_audit_log"
}
}
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:EventTime}%{SPACE}%{WORD:TimeZone}%{SPACE}(%{WORD:MWSLogType}:%{WORD:Severity})%{SPACE}%{GREEDYDATA:LogMessage}" }
remove_field => ["message"]
}
}
output {
kafka {
codec => "json"
bootstrap_servers => "my.kafka.com:9092"
topic_id => ["SOMETOPIC"]
}
}

That's all.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.