Filter for Kafka and Zookeeper System Logs

Hi all - I'm new to elastic and grok. I'm trying to get Kafka/Zookeeper system logs filtered and parsed by Logstash (shipped by filebeat), then stored in Elasticsearch, so that they may be viewable on Kibana all on CentOS 7.

I imagine I don't need the Kafka plugin as I'm not interested in Kafka topics, only the system related logs (controller.log, system.log, zookeeper.log, etc).

Can I use grok to filter these logs? What would a good starting point for a filter like this be?

Example log:
[2018-03-15 22:27:31,877] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)

Current input:

input {
beats {
port => 5044

Current output:

output {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"

This input and output works for /var/log/messages and /var/log/secure. Any advice will be helpful, thank you.

Caveat: it's pretty tough to come up with a generic pattern that will match many messages when we're only given a single example, but I hope this is enough to get you started.

You could use the grok constructor, but for a log message this simple I would probably start with the dissect filter, followed by the date filter to set the event's timestamp:

filter {
  dissect {
    "mapping" => {
      "message" => "[%{[@metadata][ts]}] %{level} %{message}"
  date {
    "match" => ["[@metadata][ts]", "yyyy-MM-dd HH:mm:ss,SSS"]

Hey - I appreciate the reply! It's definitely enough to get me started, thank you for posting. I'll start with this and go from there.

I've created a filter "20-filter-kafka" with the above pattern, however, nothing is coming in that I can see on Kibana.

To simplify the process, I've reduced my output file to:

output {
elasticsearch {
hosts => ["localhost:9200"]

Filebeat.yml prospectors:


Each - is a prospector. Most options can be set at the prospector level, so
you can use different prospectors for various configurations.
Below are the prospector specific configurations.

  • type: log

    Change to true to enable this prospector configuration.
    enabled: true

    Paths that should be crawled and fetched. Glob based paths.

    • /kafka_2.11-1.0.0/logs/server.log

In Kibana my index pattern is:


However, I have tried:

Both with a time filter field name log_stamp


[2018-03-23T11:37:57,704][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
[2018-03-23T11:37:57,712][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}
[2018-03-23T11:37:57,970][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-03-23T11:37:58,093][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.2"}
[2018-03-23T11:37:58,275][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-23T11:37:58,986][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-23T11:37:59,456][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2018-03-23T11:37:59,480][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-03-23T11:37:59,652][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2018-03-23T11:37:59,708][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>nil}
[2018-03-23T11:37:59,709][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>6}
[2018-03-23T11:37:59,714][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2018-03-23T11:37:59,716][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"default"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2018-03-23T11:37:59,727][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2018-03-23T11:38:00,196][INFO ][ ] Beats inputs: Starting input listener {:address=>""}
[2018-03-23T11:38:00,373][INFO ][logstash.pipeline ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x2c3a9acf@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:246 sleep>"}
[2018-03-23T11:38:00,452][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2018-03-23T11:38:00,469][INFO ][] Starting server on port: 5044

I've had my ELK stack working successfully with other filters so I think my connection issue is fine. /var/log/logstash/logstash-plain.log seems normal. Is there something I can paste or check that will help us figure out what's wrong?

I appreciate the help! Thank you!

-edit- It now works. I needed to create a new index pattern with logstash-*. I'm not sure if it was always there, but I didn't see it before. I was hopeful by making a "*index pattern would also allow the logs to show but it did not. Regardless, I've got something to work with now and for that, thank you very much for your reply earlier.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.