Logstash Kafka Output How to specify the Kafka Partition


(Albert Stark) #1

We have a filebeat sending logs to logstash 1. Logstash 1 writes the logs to Kafka. Logstash 2 read the logs from kafka and writes the logs to elastic. This pipeline is work we have logs going into elastic. The issue is we can not specify which kafka partition to use. The partition is being pinned to either 0, 1 or 2. Not the partition we specified.

We are writing the topic monatee_loggy_tpc. We want the logs to go to partition 6140. We thought that by setting message_key to 6140 kafka would write the logs to partition 6140.

In logstash 2 we are actually seeing the logs being written to kafka.partition with a value of 0, or 1 or 2. What are we doing wrong.

The pipeline configurations for logstash 1 and logstash 2 are included below.

The pipeline configuration for logstash 1

    input {
        beats {
            port => "5044"
            type => syslog
        }
    }
    filter {
        grok {
          match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{GREEDYDATA:syslog_message}" }
        }
        date {
          match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
        }

        if [syslog_hostname]{
            mutate {
               add_field => [ "message_key", "6140" ]
            }
       }
       ruby {
         code => "event.set('logstash_1_received_time', Time.now.utc.strftime('%FT%T.%L') )"
       }
       mutate {
          add_field => [ "logstash_1_server", "r00j9un0c" ]
       }
    }
    output {
        kafka {
          bootstrap_servers          => "rsomtapae182.bnymellon.net:9092,rsomtapae183.bnymellon.net:9092,rsomtapae184.bnymellon.net:9092"
          topic_id                   => "monatee_loggy_tpc"
          jaas_path                  => "/opt/pki/logstash_config_kafka/kafka_client_jaas_logstash.conf"
          security_protocol          => "SASL_PLAINTEXT"
          sasl_kerberos_service_name => "kafka"
          sasl_mechanism             => "PLAIN"
          codec => plain {
            format => "%{logstash_1_received_time} %{logstash_1_server} %{message}"
          }
        }
    }

The pipeline configuration for logstash 2

    input {
        kafka {
          bootstrap_servers          => "rsomtapae182.bnymellon.net:9092,rsomtapae183.bnymellon.net:9092,rsomtapae184.bnymellon.net:9092"
          topics                     => [ "monatee_loggy_tpc" ]
          jaas_path                  => "/opt/pki/logstash_config_kafka/kafka_client_jaas_logstash.conf"
          security_protocol          => "SASL_PLAINTEXT"
          sasl_kerberos_service_name => "kafka"
          sasl_mechanism             => "PLAIN"
          group_id                   => "monatee_loggy_tpc"
          decorate_events            => true
        }
    }
    # The filter part of this file is commented out to indicate that it is
    # optional.
    filter {
        grok {
          match => { "message" => "(?<logstash_1_received_time>[0-9][0-9zZtT\:\.\-]*) (?<logstash_1_server>[a-zA-z][a-zA-z0-9\.]*) %{SYSLOG5424PRI}*%{SYSLOGTIMESTAMP:syslog_time} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }

          # need it for alert queuering
          add_field => [ "datacenter", "cnj" ]
          add_field => [ "env",        "dev" ]
          add_field => [ "family",     "pki" ]
          add_field => [ "app",        "monitoring" ]
          add_field => [ "service",    "loggy" ]
          add_field => [ "component",  "logstash_2" ]
          add_field => [ "logtype",    "syslog" ]
        }
        date {
          match => [ "logstash_1_received_time", "ISO8601" ]
        }
        date {
          match => [ "syslog_time",
                     "MMM  d HH:mm:ss",
                     "MMM dd HH:mm:ss",
                     "MMM d HH:mm:ss",
                     "ISO8601" ]
        }
        ruby {
          code => "event.set('logstash_2_received_time', Time.now.utc.strftime('%FT%T.%L') )"
        }
        mutate {
          add_field => [ "logstash_2_server", "r00j9un0c" ]
        }
    }
    output {
        # stdout { codec => rubydebug }
        elasticsearch {
          hosts         => ["monatee-loggy-master-tpc.dev.bnymellon.net:80"]
          index         => "monatee_loggy_syslog_tpc-%{+YYYY.MM.dd}"
          document_type => "syslog"
        }
    }

(Paris Mermigkas) #2

First of all, what's the use-case of needing to flush to specific partitions?

Also, the message_key configuration property does not work exactly like this. It just guarantees that events with the same message key will end up in the same partition, not which partition that will be. So you usually use it with a field reference.

(Btw, that partition 6140 looks odd, I find it unlikely that a kafka topic would have 6141 partitions).


(system) #3

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.