We have a filebeat sending logs to logstash 1. Logstash 1 writes the logs to Kafka. Logstash 2 read the logs from kafka and writes the logs to elastic. This pipeline is work we have logs going into elastic. The issue is we can not specify which kafka partition to use. The partition is being pinned to either 0, 1 or 2. Not the partition we specified.
We are writing the topic monatee_loggy_tpc. We want the logs to go to partition 6140. We thought that by setting message_key to 6140 kafka would write the logs to partition 6140.
In logstash 2 we are actually seeing the logs being written to kafka.partition with a value of 0, or 1 or 2. What are we doing wrong.
The pipeline configurations for logstash 1 and logstash 2 are included below.
The pipeline configuration for logstash 1
input {
beats {
port => "5044"
type => syslog
}
}
filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{GREEDYDATA:syslog_message}" }
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
if [syslog_hostname]{
mutate {
add_field => [ "message_key", "6140" ]
}
}
ruby {
code => "event.set('logstash_1_received_time', Time.now.utc.strftime('%FT%T.%L') )"
}
mutate {
add_field => [ "logstash_1_server", "r00j9un0c" ]
}
}
output {
kafka {
bootstrap_servers => "rsomtapae182.bnymellon.net:9092,rsomtapae183.bnymellon.net:9092,rsomtapae184.bnymellon.net:9092"
topic_id => "monatee_loggy_tpc"
jaas_path => "/opt/pki/logstash_config_kafka/kafka_client_jaas_logstash.conf"
security_protocol => "SASL_PLAINTEXT"
sasl_kerberos_service_name => "kafka"
sasl_mechanism => "PLAIN"
codec => plain {
format => "%{logstash_1_received_time} %{logstash_1_server} %{message}"
}
}
}
The pipeline configuration for logstash 2
input {
kafka {
bootstrap_servers => "rsomtapae182.bnymellon.net:9092,rsomtapae183.bnymellon.net:9092,rsomtapae184.bnymellon.net:9092"
topics => [ "monatee_loggy_tpc" ]
jaas_path => "/opt/pki/logstash_config_kafka/kafka_client_jaas_logstash.conf"
security_protocol => "SASL_PLAINTEXT"
sasl_kerberos_service_name => "kafka"
sasl_mechanism => "PLAIN"
group_id => "monatee_loggy_tpc"
decorate_events => true
}
}
# The filter part of this file is commented out to indicate that it is
# optional.
filter {
grok {
match => { "message" => "(?<logstash_1_received_time>[0-9][0-9zZtT\:\.\-]*) (?<logstash_1_server>[a-zA-z][a-zA-z0-9\.]*) %{SYSLOG5424PRI}*%{SYSLOGTIMESTAMP:syslog_time} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
# need it for alert queuering
add_field => [ "datacenter", "cnj" ]
add_field => [ "env", "dev" ]
add_field => [ "family", "pki" ]
add_field => [ "app", "monitoring" ]
add_field => [ "service", "loggy" ]
add_field => [ "component", "logstash_2" ]
add_field => [ "logtype", "syslog" ]
}
date {
match => [ "logstash_1_received_time", "ISO8601" ]
}
date {
match => [ "syslog_time",
"MMM d HH:mm:ss",
"MMM dd HH:mm:ss",
"MMM d HH:mm:ss",
"ISO8601" ]
}
ruby {
code => "event.set('logstash_2_received_time', Time.now.utc.strftime('%FT%T.%L') )"
}
mutate {
add_field => [ "logstash_2_server", "r00j9un0c" ]
}
}
output {
# stdout { codec => rubydebug }
elasticsearch {
hosts => ["monatee-loggy-master-tpc.dev.bnymellon.net:80"]
index => "monatee_loggy_syslog_tpc-%{+YYYY.MM.dd}"
document_type => "syslog"
}
}