Problem with sending logs from filebeat to logstash when I add kafka to pipeline

I seem to be having a problem with sending logs from filebeat to logstash via kafka, and I can't find out why.
I can send them directly from filebeat to logstash without a problem, but when I add kafka to the pipeline there seems to be a problem.
Here is my filebeat config file (the part about kafka):

output.kafka:
  # The Kafka hosts
  hosts: ["xxx.232:9092"]
  topic: 'logs'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

Here is my logstash config:

input {
    kafka {
        bootstrap_servers => "xxx.232:9092"
        topics => ["logs"]
        codec => "json"
    }
}

output {
    elasticsearch {
        hosts => ["http://xxx.52:9200/"]
        index => "logstash-%{+YYYY.MM.dd}"
    }
}

On the kafka server I added the logs topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logs

I tried downgrading the kafka version from 2.4.0 to 2.2.2 but It didn't help.

These are some strange logs from filebeat console:

2020-11-04T15:23:16.914+0100    INFO    [publisher_pipeline_output]     pipeline/output.go:152  Connection to kafka(xxx.232:9092) established
2020-11-04T15:23:16.914+0100    INFO    [publisher]     pipeline/retry.go:221   retryer: send unwait signal to consumer
2020-11-04T15:23:16.915+0100    INFO    [publisher]     pipeline/retry.go:225     done

Hi @Pavle_Ilic :slightly_smiling_face:

No idea of what the issue might be but I'd try removing compression from the Filebeat output and the codec from the Logstash Kafka input to see if that helps.

Generally speaking, try the most barebones configuration possible and add parameters one by one until it fails. It might also be that you have setup 1 partition with kafka-topics.sh but then you are configuring Filebeat round_robin.

Full logs (or, at least, more logs) will also be useful.

Hi @Mario_Castro,
Thanks for the answer.
I've tried removing everything except hosts and topic from output.kafka im my filebeat.yml, and I also removed codec from logstash config, so I've tried with the neccessery configuration only, it still doesn't work.
Here are full logs from the filebeat console with minimum config:

2020-11-05T11:09:56.998+0100    INFO    instance/beat.go:647    Home path: [C:\ELK\filebeat] Config path: [C:\ELK\filebeat] Data path: [C:\ELK\filebeat\data] Logs path: [C:\ELK\filebeat\logs]
2020-11-05T11:09:56.998+0100    INFO    instance/beat.go:655    Beat ID: c23a3e6e-4df2-4eb9-8dfb-d2e23acc9757
2020-11-05T11:09:57.015+0100    INFO    [beat]  instance/beat.go:983    Beat info       {"system_info": {"beat": {"path": {"config": "C:\\ELK\\filebeat", "data": "C:\\ELK\\filebeat\\data", "home": "C:\\ELK\\filebeat", "logs": "C:\\ELK\\filebeat\\logs"}, "type": "filebeat", "uuid": "c23a3e6e-4df2-4eb9-8dfb-d2e23acc9757"}}}
2020-11-05T11:09:57.015+0100    INFO    [beat]  instance/beat.go:992    Build info      {"system_info": {"build": {"commit": "f79387d32717d79f689d94fda1ec80b2cf285d30", "libbeat": "7.8.0", "time": "2020-06-14T18:15:35.000Z", "version": "7.8.0"}}}
2020-11-05T11:09:57.016+0100    INFO    [beat]  instance/beat.go:995    Go runtime info {"system_info": {"go": {"os":"windows","arch":"amd64","max_procs":12,"version":"go1.13.10"}}}
2020-11-05T11:09:57.016+0100    INFO    [add_cloud_metadata]    add_cloud_metadata/add_cloud_metadata.go:89     add_cloud_metadata: hosting provider type not detected.
2020-11-05T11:09:57.025+0100    INFO    [beat]  instance/beat.go:999    Host info       {"system_info": {"host": {"architecture":"x86_64","boot_time":"2020-10-15T03:26:10.45+02:00","name":"Programer20","ip":["fe80::2df2:d890:4844:9860/64","192.168.182.14/24","fe80::7c5f:1ea:71d7:ed10/64","192.168.56.1/24","::1/128","127.0.0.1/8","fe80::d17f:f2c9:82ab:1eb5/64","172.17.114.209/28"],"kernel_version":"10.0.18362.1139 (WinBuild.160101.0800)","mac":["04:d4:c4:25:ae:17","0a:00:27:00:00:1e","00:15:5d:83:97:82"],"os":{"family":"windows","platform":"windows","name":"Windows 10 Pro","version":"10.0","major":10,"minor":0,"patch":0,"build":"18363.1139"},"timezone":"CET","timezone_offset_sec":3600,"id":"3439eb83-6fc1-47ab-b085-59ac9e4298b1"}}}
2020-11-05T11:09:57.025+0100    INFO    [beat]  instance/beat.go:1028   Process info    {"system_info": {"process": {"cwd": "C:\\ELK\\filebeat", "exe": "C:\\ELK\\filebeat\\filebeat.exe", "name": "filebeat.exe", "pid": 15100, "ppid": 13184, "start_time": "2020-11-05T11:09:56.915+0100"}}}
2020-11-05T11:09:57.025+0100    INFO    instance/beat.go:310    Setup Beat: filebeat; Version: 7.8.0
2020-11-05T11:09:57.026+0100    INFO    [publisher]     pipeline/module.go:113  Beat name: Programer20
2020-11-05T11:09:57.028+0100    WARN    beater/filebeat.go:156  Filebeat is unable to load the Ingest Node pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the Ingest Node pipelines or are using Logstash pipelines, you can ignore this warning.
2020-11-05T11:09:57.028+0100    INFO    instance/beat.go:463    filebeat start running.
2020-11-05T11:09:57.028+0100    INFO    [monitoring]    log/log.go:118  Starting metrics logging every 30s
2020-11-05T11:09:57.029+0100    WARN    beater/filebeat.go:339  Filebeat is unable to load the Ingest Node pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the Ingest Node pipelines or are using Logstash pipelines, you can ignore this warning.
2020-11-05T11:09:57.031+0100    INFO    registrar/registrar.go:145      Loading registrar data from C:\ELK\filebeat\data\registry\filebeat\data.json
2020-11-05T11:09:57.055+0100    INFO    registrar/registrar.go:152      States Loaded from registrar: 5123
2020-11-05T11:09:57.055+0100    INFO    [crawler]       beater/crawler.go:71    Loading Inputs: 1
2020-11-05T11:09:57.059+0100    INFO    log/input.go:152        Configured paths: [C:\Users\denis.kuzner\Documents\transaction-log-service\logs\logstash\*]
2020-11-05T11:09:57.059+0100    INFO    [crawler]       beater/crawler.go:141   Starting input (ID: %d)6811744816269519122
2020-11-05T11:09:57.060+0100    INFO    log/harvester.go:297    Harvester started for file: C:\Users\denis.kuzner\Documents\transaction-log-service\logs\logstash\transaction-log-service.04-11-2020.0.log.zip
2020-11-05T11:09:57.062+0100    INFO    log/input.go:152        Configured paths: [c:\data\log\mongod.log]
2020-11-05T11:09:57.062+0100    INFO    [crawler]       beater/crawler.go:108   Loading and starting Inputs completed. Enabled inputs: 1
2020-11-05T11:09:57.062+0100    INFO    cfgfile/reload.go:164   Config reloader started
2020-11-05T11:09:57.067+0100    INFO    log/input.go:152        Configured paths: [c:\data\log\mongod.log]
2020-11-05T11:09:57.067+0100    INFO    cfgfile/reload.go:224   Loading of config files completed.
2020-11-05T11:10:17.060+0100    INFO    log/harvester.go:297    Harvester started for file: C:\Users\denis.kuzner\Documents\transaction-log-service\logs\logstash\transaction-log-service.log
2020-11-05T11:10:18.060+0100    INFO    [publisher_pipeline_output]     pipeline/output.go:144  Connecting to kafka(xxx.232:9092)
2020-11-05T11:10:18.060+0100    INFO    [publisher]     pipeline/retry.go:221   retryer: send unwait signal to consumer
2020-11-05T11:10:18.061+0100    INFO    [publisher]     pipeline/retry.go:225     done
2020-11-05T11:10:18.060+0100    INFO    [publisher_pipeline_output]     pipeline/output.go:152  Connection to kafka(xxx.232:9092) established
2020-11-05T11:10:26.117+0100    INFO    [publisher]     pipeline/retry.go:221   retryer: send unwait signal to consumer
2020-11-05T11:10:26.117+0100    INFO    [publisher]     pipeline/retry.go:225     done
2020-11-05T11:10:27.028+0100    INFO    [monitoring]    log/log.go:145  Non-zero metrics in the last 30s        {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":78,"time":{"ms":78}},"total":{"ticks":156,"time":{"ms":156},"value":0},"user":{"ticks":78,"time":{"ms":78}}},"handles":{"open":302},"info":{"ephemeral_id":"96562827-416f-4204-bb4a-bbacadfcfb62","uptime":{"ms":30074}},"memstats":{"gc_next":26729648,"memory_alloc":16716496,"memory_total":47417624,"rss":55205888},"runtime":{"goroutines":53}},"filebeat":{"events":{"active":1,"added":5,"done":4},"harvester":{"files":{"a317104f-e95a-4c4e-be2a-30880da6e664":{"last_event_published_time":"2020-11-05T11:10:17.060Z","last_event_timestamp":"2020-11-05T11:10:17.060Z","name":"C:\\Users\\denis.kuzner\\Documents\\transaction-log-service\\logs\\logstash\\transaction-log-service.log","read_offset":6729,"size":6729,"start_time":"2020-11-05T11:10:17.060Z"},"cec98e9d-2980-40e1-b287-3fa5cf957215":{"last_event_published_time":"","last_event_timestamp":"","name":"C:\\Users\\denis.kuzner\\Documents\\transaction-log-service\\logs\\logstash\\transaction-log-service.04-11-2020.0.log.zip","read_offset":3890,"size":4074,"start_time":"2020-11-05T11:09:57.060Z"}},"open_files":2,"running":2,"started":2}},"libbeat":{"config":{"module":{"running":0},"reloads":1,"scans":1},"output":{"events":{"active":1,"batches":2,"failed":1,"total":2},"type":"kafka"},"outputs":{"kafka":{"bytes_read":588,"bytes_write":164}},"pipeline":{"clients":2,"events":{"active":1,"filtered":4,"published":1,"retry":2,"total":5}}},"registrar":{"states":{"current":5123,"update":4},"writes":{"success":4,"total":4}},"system":{"cpu":{"cores":12}}}}}
2020-11-05T11:10:34.700+0100    INFO    [publisher]     pipeline/retry.go:221   retryer: send unwait signal to consumer
2020-11-05T11:10:34.700+0100    INFO    [publisher]     pipeline/retry.go:225     done

Hope it helps, because I'm really stuck and don't know how to approach it anymore.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.