Ship Kafka logs

Hello, can anyone please give me some info or point me to the doc on how to ship kafka logs to elk. I did check some which did not work out. I also read that it is slightly different than other server logs that we ship

Thanks!

I have been trying to use below logstash config but I get the error " [2019-09-24T13:39:47,479][ERROR][logstash.agent ] Cannot create pipeline {:reason=>"Expected one of #, -, ", ', } at line 12, column 23 (byte 172) after filter{\n if ([fields][application] == "KafkaDevInt") {\n grok {\n match => {"}"

Logstash config:

input {
 beats {
     port => 5044
 }
 beats {
     port => 5045
 }
}
filter{
   if ([fields][application] == "KafkaDevInt") {
       grok {
           match => {%{SYSLOG5424SD} %{LOGLEVEL} %{GREEDYDATA:message}}
      }
   }
}
output {
 elasticsearch {
     hosts => "localhost:9200"
 }
}

kafka logs are in below format:

[2019-09-22 13:38:04,227] TRACE [Controller id=0 epoch=1] Received response {error_code=0,partitions=[{topic=treds,partition=0,error_code=0}]} for a request sent to broker IP:PORT (id: 0 rack: null) (state.change.logger)

Can someone help me here please.

Thanks!

That should be more like

grok { match => { "message" => "pattern" } }

Thanks, will try that out and update here.

I'm getting "The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information." error . Below are the logs. I have also referred to https://www.elastic.co/guide/en/logstash/5.6/shutdown.html using the flag but that did not help. Please help!

[2019-09-26T09:42:15,565][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{"other"=>[{"thread_id"=>26, "name"=>"[main]<beats", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-beats-3.1.30-java/lib/logstash/inputs/beats.rb:209:in `run'"}, {"thread_id"=>27, "name"=>"[main]<beats", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-beats-3.1.30-java/lib/logstash/inputs/beats.rb:209:in `run'"}, {"thread_id"=>21, "name"=>"[main]>worker0", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:112:in `lock'"}, {"thread_id"=>22, "name"=>"[main]>worker1", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:112:in `lock'"}, {"thread_id"=>23, "name"=>"[main]>worker2", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:112:in `lock'"}, {"thread_id"=>24, "name"=>"[main]>worker3", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:112:in `lock'"}]}}
[2019-09-26T09:42:15,567][ERROR][logstash.shutdownwatcher ] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
[2019-09-26T09:42:20,546][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{"other"=>[{"thread_id"=>27, "name"=>"[main]<beats", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-beats-3.1.30-java/lib/logstash/inputs/beats.rb:209:in `run'"}, {"thread_id"=>21, "name"=>"[main]>worker0", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:138:in `lock'"}, {"thread_id"=>22, "name"=>"[main]>worker1", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:138:in `lock'"}, {"thread_id"=>23, "name"=>"[main]>worker2", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:112:in `lock'"}, {"thread_id"=>24, "name"=>"[main]>worker3", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:138:in `lock'"}]}}

[2019-09-26T09:42:15,565][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{"other"=>[{"thread_id"=>26, "name"=>"[main]<beats", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-beats-3.1.30-java/lib/logstash/inputs/beats.rb:209:in run'"}, {"thread_id"=>27, "name"=>"[main]<beats", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-beats-3.1.30-java/lib/logstash/inputs/beats.rb:209:inrun'"}, {"thread_id"=>21, "name"=>"[main]>worker0", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:112:in lock'"}, {"thread_id"=>22, "name"=>"[main]>worker1", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:112:inlock'"}, {"thread_id"=>23, "name"=>"[main]>worker2", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:112:in lock'"}, {"thread_id"=>24, "name"=>"[main]>worker3", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:112:inlock'"}]}}
[2019-09-26T09:42:15,567][ERROR][logstash.shutdownwatcher ] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
[2019-09-26T09:42:20,546][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{"other"=>[{"thread_id"=>27, "name"=>"[main]<beats", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-beats-3.1.30-java/lib/logstash/inputs/beats.rb:209:in run'"}, {"thread_id"=>21, "name"=>"[main]>worker0", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:138:inlock'"}, {"thread_id"=>22, "name"=>"[main]>worker1", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:138:in lock'"}, {"thread_id"=>23, "name"=>"[main]>worker2", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:112:inlock'"}, {"thread_id"=>24, "name"=>"[main]>worker3", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:138:in `lock'"}]}}

hello, I'm able to ship the data after some troubleshooting but I see that the logs are not appearing in the timestamp they occurred. Example, in the screenshot below, if you notice the 'message' section the event occurred on 24th September but the "@timestamp" doesn't show 24th September. Can you tell me how to fix this please

To add to, I'm using below grok pattern:

grok {
match => {"actual_message" => "%{SYSLOG5424SD} %{LOGLEVEL} %{GREEDYDATA}"}
}

Log format:

[2019-09-24 12:54:11,546] DEBUG [Controller id=0] Topics not in preferred replica Map() (kafka.controller.KafkaController)
[2019-09-24 12:54:11,546] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)

That grok doesn't add any fields to the message. You need to name them. Then use a date filter to parse the timestamp.

    grok { match => {"actual_message" => "%{SYSLOG5424SD:[@metadata][date]} %{LOGLEVEL} %{GREEDYDATA}"} }
    date { match => [ "[@metadata][date]", "[YYYY-MM-dd HH:mm:ss,SSS]" ] }

Thanks! How do i get the '@timestamp' to be in GMT. In the 'message' section the log is showing in GMT but '@timestamp' is pulling standard time of the machine. Also I'm trying to ship all the old logs as well but I see all the old logs in today's date again. I do not understand what is causing this.

Use the timezone option on the date filter.

I tried using timezone => "UTC" but i still see the same :

Below is my logstash config:

input {
  beats {
      port => 5044
  }
}
filter{
   if ([fields][application] == "KafkaDevInt") {
        grok {
            match => {"actual_message" => "%{SYSLOG5424SD:[@metadata][date]} %{LOGLEVEL} %{GREEDYDATA}"}
             }
        date {
            match => [ "[@metadata][date]","[YYYY-MM-dd HH:mm:ss,SSS]" ]
            timezone => "UTC"
             }
                                               }
    }
output{
elasticsearch {
      hosts => "localhost:9200"
  }
}

This is really wierd, with the same config I sometimes see no logs shipping at all and with no errors seen anywhere. Config test also seems to be reporting fine. Please help me sort this out

The timezone option tells the date filter what timezone the time in the log entry is in, not what timezone you want it converted to. It always converts to UTC.

But the timezone is always UTC in the logs. Wondering why is it not displayed as UTC in @timestamp?

In Kibana, can you click on event then copy and paste all the fields from the JSON tab?

Please do not post pictures of text, just post the text.

Ok, I removed the image. Kindly check the log lines below :

[2019-09-28T09:35:10,665][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2019-09-28T09:35:10,694][INFO ][logstash.pipeline        ] Pipeline main started
[2019-09-28T09:35:10,702][INFO ][org.logstash.beats.Server] Starting server on port: 5044
[2019-09-28T09:35:10,735][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2019-09-28T10:31:11,263][WARN ][logstash.runner          ] SIGTERM received. Shutting down the agent.
[2019-09-28T10:31:11,267][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}
[2019-09-28T10:31:16,301][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{"other"=>[{"thread_id"=>26, "name"=>"[main]<beats", "current_call"=>"[...]/vendor/bundle/jruby/1.9/gems/logstash-input-beats-3.1.32-java/lib/logstash/inputs/beats.rb:209:in `run'"}, {"thread_id"=>21, "name"=>"[main]>worker0", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:138:in `lock'"}, {"thread_id"=>22, "name"=>"[main]>worker1", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:112:in `lock'"}, {"thread_id"=>23, "name"=>"[main]>worker2", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:138:in `lock'"}, {"thread_id"=>24, "name"=>"[main]>worker3", "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:138:in `lock'"}]}}
[2019-09-28T10:31:16,302][ERROR][logstash.shutdownwatcher ] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
[2019-09-28T10:31:24,890][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
[2019-09-28T10:31:24,893][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}
[2019-09-28T10:31:25,402][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2019-09-28T10:31:25,404][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2019-09-28T10:31:25,496][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2019-09-28T10:31:25,531][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2019-09-28T10:31:25,534][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2019-09-28T10:31:25,538][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2019-09-28T10:31:25,590][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2019-09-28T10:31:25,976][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}

I tried using the flag mentioned here -https://www.elastic.co/guide/en/logstash/current/shutdown.html
but i still see this error every now and then

I do not understand why the shutdown process would stall when there are 0 in-flight events.

How do i workaround that ?

I'm stuck here, kindly provide some suggestion on how to fix these errors. Let me know if you need any further info from me.