Filebeat with Kafka Does Not Insert Message Missing a Colon into Kibana

I am working with an application that writes to a log file. Filebeat uses the Kafka module to send the log data to a central logging server. I am testing the process by appending to the log file with echo.

This command writes data to the log index:

echo 2024-05-17 21:35:50,394 - daemon - INFO: - TEST LOG MESSAGE  >> my_daemon.log 

This command does not write data to the log index.

echo 2024-05-17 21:35:50,394 - daemon - INFO - TEST LOG MESSAGE  >> my_daemon.log 

Notice that the second command differs from the first only by omitting a colon after "INFO". This suggests to me that something is rejecting the message based on format.

Where should I look to find what is validating and rejecting the message?

I am using Filebeat 7.12.1

# filebeat version
filebeat version 7.12.1 (amd64), libbeat 7.12.1 [651a2ad1225f3d4420a22eba847de385b71f711d built 2021-04-20 20:58:32 +0000 UTC]

Here is my filebeat.yml file.

filebeat.inputs:
- type: log
  paths:
    - /opt/my/path/api/my_web.log
    - /opt/my/path/api/my_daemon.log
    - /opt/my/path/api/my_daemon.error.log

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d
  reload.enabled: false

setup.template.settings:
  index.number_of_shards: 3

fields: {app_name: "Application Name",app_campus: "US",app_env: "PROD",app_client: "MyApp"}

output.kafka:
  enabled: true
  version: '0.10.0.1'
  hosts: ["server01.domain.com:9092", "server02.domain.com:9092", "server03.domain.com:9092"]
  topic: 'MyTopic'
  partition.round_robin:
    reachable_only: true
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000000

This is my kafka.yml file.

# Module: kafka
# Docs: https://www.elastic.co/guide/en/beats/filebeat/7.x/filebeat-module-kafka.html

- module: kafka
  # All logs
  log:
    enabled: true

    # Set custom paths for Kafka. If left empty,
    # Filebeat will look under /opt.
    #var.kafka_home:

    # Set custom paths for the log files. If left empty,
    # Filebeat will choose the paths depending on your OS.
    #var.paths:

It is not clear to me that Filebeat or Kafka is the entity performing validation. This might be a Kibana question.

The central logging cluster is named "central_logging". It is running Elasticsearch 7.17.18.

I have determined that the log message does reach a Kafka server. So the question becomes why is the message not written to central logging.

I setup a consumer using Python package confluent-kafka. The consumer received this line:

Received message: {"@timestamp":"2024-05-17T22:18:25.651Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.12.1"},"message":"2024-05-17 22:19:03,394 - daemon - INFO - TEST LOG MESSAGE","input":{"type":"log"},"fields":{"app_client":"AppClient","app_name":"ApplicationName","app_campus":"US","app_env":"PROD"},"ecs":{"version":"1.8.0"},"host":{"name":"appserver.domain.com"},"agent":{"name":"appserver.domain.com","type":"filebeat","version":"7.12.1","hostname":"appserver.domain.com","ephemeral_id":"d2d21bee-38f6-4482-b03a-a400aabb934a","id":"71d70255-b66f-46e2-8acd-5faeb49c05d0"},"log":{"offset":1795613,"file":{"path":"/opt/analytics/my/path/my_daemon.log"}}}

Notice that there is no colon after "INFO".

Where should I look to find what entity is validating and rejecting this log message?

You need to provide a little more context.

You have Filebeat sending logs to Kafka, but how does the logs goes from your Kafka to your Elasticsearch?

What is consuming the messages from your kafka and sending them to Elasticsearch?

There is a missing piece here.

I have found the cause:

The Kafka data sends data to Logstash. Logstash runs an event pipeline.

The Logstash event processing pipeline has three stages: inputs → filters → outputs.
How Logstash Works | Logstash Reference [8.13] | Elastic

Filters are intermediary processing devices in the Logstash pipeline. You can combine filters with conditionals to perform an action on an event if it meets certain criteria. Some useful filters include:

In my case, the Logstash pipeline contains a grok filter that looks like this:

    grok {
        break_on_match => true
        match =>  {
            "message" => [
                "%{TIMESTAMP_ISO8601:origin_timestamp} - %{WORD:logging_object} - %{WORD:log_level}: %{IPORHOST:[extended_properties] ...

The pattern contains a literal colon here %{WORD:log_level}:. When a log line does not contain a colon immediately following the log level, the filter rejects it.

I think you can use %{WORD:log_level}(:)? to make the : optional.

1 Like