Filebeat 5.0 output to Kafka multiple topics

I installed Filebeat 5.0 on my app server and have 3 Filebeat prospectors, each of the prospector are pointing to different log paths and output to one kafka topic called myapp_applog and everything works fine.

My Filebeat output configuration to one topic - Working

    output.kafka:
        # initial brokers for reading cluster metadata
        hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
    
        # message topic selection + partitioning
        topic: 'myapp_applog'
        partition.round_robin:
          reachable_only: false
    
        required_acks: 1
        compression: gzip
        max_message_bytes: 1000000

What I want to do is send each of the log files to seperate topics based on a condition see documentation section on topics. I have tried to do it but no data is been sent to any of the topics. Does anyone know why my condition does not match or it is correct. I can seem to find an example on how to correctly use the "topics topic condition".

Here is my kafka output to muliple topics configuration.

Not Working

output.kafka:
    # initial brokers for reading cluster metadata
    hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]

    # message topic selection + partitioning
    topics:
      - topic: 'myapp_applog'
        when: 
          equals:
            document_type: applog_myappapi
      - topic: 'myapp_applog_stats'
        when:
          equals:
            document_type: applog_myappapi_stats
      - topic: 'myapp_elblog'
        when:
          equals:
            document_type: elblog_myappapi
    partition.round_robin:
      reachable_only: false

    required_acks: 1
    compression: gzip
    max_message_bytes: 1000000

Here is full filebeat.yml configuration file.

################### Filebeat Configuration Example #########################
############################# Filebeat ######################################
filebeat.prospectors:
    # App logs - prospector
    - input_type: log
      paths:
        - /myapp/logs/myapp.log
      exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
      exclude_files: [".gz$", ".tmp"]
      fields:
        api: myappapi
        environment: STG
      ignore_older: 24h
      document_type: applog_myappapi
      scan_frequency: 1s

      # Multine on Timestamp, YYYY-MM-DD
      # https://www.elastic.co/guide/en/beats/filebeat/master/multiline-examples.html 
      multiline:
        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
        negate: true
        match: after
        max_lines: 500
        timeout: 5s

    # Server Stats - prospector
    - input_type: log
      paths:
        - /myapp/logs/serverstats.log

      # Exclude messages with log level
      exclude_lines: [".+? ERROR[^*].+", ".+? DEBUG[^*].+"]
      exclude_files: [".gz$", ".tmp"]
      fields:
        api: myappapi
        environment: STG
      ignore_older: 24h
      document_type: applog_myappapi_stats
      scan_frequency: 1s

    # ELB prospector
    -
      input_type: log
      paths:
        - /var/log/httpd/elasticbeanstalk-access_log
      document_type: elblog_myappapi
      fields:
        api: myappapi
        environment: STG
      exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
      exclude_files: [".gz$", ".tmp"]
      ignore_older: 24h

      # 0s, it is done as often as possible. Default: 10s
      scan_frequency: 1s
registry_file: /var/lib/filebeat/registry

############################# Output ##########################################
# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.

output.kafka:
    # initial brokers for reading cluster metadata
    hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]

    # message topic selection + partitioning
    topics:
      - topic: 'myapp_applog'
        when: 
          equals:
            document_type: applog_myappapi
      - topic: 'myapp_applog_stats'
        when:
          equals:
            document_type: applog_myappapi_stats
      - topic: 'myapp_elblog'
        when:
          equals:
            document_type: elblog_myappapi
    partition.round_robin:
      reachable_only: false

    required_acks: 1
    compression: gzip
    max_message_bytes: 1000000

The document_type per prospector becomes the event field type. That's why the filter won't match.

Instead of conditionals consider using the format string like:

filebeat.prospectors:
- ...
  document_type: myapp_applog
- ...
  document_type: myapp_applog_stats
- ...
  document_type: myapp_elblog

output.kafka:
  topic: '%{[type]}' # use document_type to set topic

btw. the topic field in conditionals also supports format strings.

so the conditionals on muliple topics doesn't work.

I don't need to use topics - topic, it will pick up the log base on the type and send to that topic?

Am I understanding correct?

so the conditionals on muliple topics doesn't work.

conditionals do work. But it looks a little overkill. you have to replace document_type with type in your conditionals.

It just looks like overkill maintaining the conditionals if format strings will do the trick.

I don't need to use topics - topic, it will pick up the log base on the type and send to that topic?

The topic setting is a format string. Format strings can access fields in events. In the sample I'm setting the topic for each event by reading the type field, which you have already configured in document_type.

See Format String docs

OK, thanks for that. I will post back when get it working later to show if it worked ok.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.