Kafka error

I am using kafka as a queue.

My data is getting into the kafka cluster just fine, and the majority of it is being output to my ES cluster just fine except for one topic.

My config for the logstash node consuming the messages from the kafka cluster is:

input {
  kafka {
    zk_connect => "10.1.84.3:2181,10.1.84.4:2181,10.1.84.4:2181"
    topic_id => "topbeat"
  }
}
output {
    stdout { codec => rubydebug }
}

when I run this I get the following error:

log4j, [2016-03-05T14:39:29.025]  WARN: kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [logstash_OP-01-VM-554-1457206763224-c9f936ab-leader-finder-thread], Failed to find leader for Set([topbeat,1], [topbeat,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(topbeat)] from broker [ArrayBuffer(id:0,host:10.1.84.9,port:9092, id:12,host:10.1.84.14,port:9092, id:6,host:10.1.84.8,port:9092, id:5,host:10.1.84.7,port:9092, id:10,host:10.1.84.12,port:9092, id:9,host:10.1.84.11,port:9092, id:2,host:10.1.84.4,port:9092, id:4,host:10.1.84.6,port:9092, id:11,host:10.1.84.13,port:9092, id:3,host:10.1.84.5,port:9092)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
        at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
        at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
        at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Range.foreach(Range.scala:141)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:114)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
        ... 3 more

The strangest part is I have two other topics that are being consumed just fine, it is just this one topic. Any assistance would be appreciated.

Thanks

Hi,
as you can see the problem is related not with logstash but with Kafka:
"[logstash_OP-01-VM-554-1457206763224-c9f936ab-leader-finder-thread], Failed to find leader for Set([topbeat,1], [topbeat,0])"

The best solution (i think so) will be a create a new topic and put and get data from this new topic by logstash.

you were right. It was a kafka problem. I recreated new topics and that cleared the error.

I have another question:

I now have a topic containing Bro IDS data from a 4.5 gig network. the logs are being read off the sensor with no probelm, it appears that they cannot be ingested into the ES cluster as fast as they are written.

Kafka:
I have 3 Zookeepers and 9 Brokers.
One topic name bro-logs with 8 partitions.

Logstash:
I have 4 logstash nodes consuming the logs from the kafka cluster. the configs are:

input {
  kafka {
    zk_connect => "10.1.84.3:2181,10.1.84.4:2181,10.1.84.5:2181"
    topic_id => "bro-logs"
  }
}

FILTERS HERE

output {
##########    BRO Outputs -> ES Cluster    ##########
  if [type] == "BRO" {
    if [sensor1] == "hostname1" {
      elasticsearch {
        hosts => [ "10.1.55.8:9200","10.1.55.9:9200","10.1.55.10:9200","10.1.55.11:9200","10.1.55.12:9200","10.1.55.13:9200","10.1.55.14:9200","10.1.55.15:9200","10.1.55.16:9200","10.1.55.17:9200","10.1.55.18:9200" ]
        manage_template => false
        index => "sensor1-bro-%{+YYYY.MM.dd}"
      }
    }
    if [sensor2] == "hostname2" {
      elasticsearch {
        hosts => [ "10.1.55.8:9200","10.1.55.9:9200","10.1.55.10:9200","10.1.55.11:9200","10.1.55.12:9200","10.1.55.13:9200","10.1.55.14:9200","10.1.55.15:9200","10.1.55.16:9200","10.1.55.17:9200","10.1.55.18:9200" ]
        manage_template => false
        index => "sensor2-bro-%{+YYYY.MM.dd}"
      }
    }
  }
}

I have the same config running on all 4 logstash consumer nodes. Is there something that has to be done to consume records at a higher rate? or should I scale horizontally. But I thought that 4 logstash nodes would be able to read this data with no problem.

Any help would be appreciated

Yes :slight_smile: You are right. The best solution is horizontally scaling but, if you have a 4 logstash instance you could add setting to your kafka input:

consumer_threades => 1
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-consumer_threads

But you have 8 partition, so you can increaze this value to 2 -> 4 logstash x 2 threads = 8 (number of yours partitions).

One more thing -> please test other output (for example stdout,file) and check how fast you can get data from kafka. For ElasticSearch output you can change this two settings:

flush_size: 500
idle_flush_time: 1

sometimes better is to increase these values.
Let know about progression :slightly_smiling:

so I added the

flush_size: 500
idle_flush_time: 1

to my elasticsearch outputs. I also added:

consumer_threades => 2

to the input of my logstash nodes.

I haven't see any imporvement.

My ES data nodes are 32 gig of RAM and 16 core. They are virtual machines and I have the ability to add resources to them. Any suggestions on what would give me the biggest bang, cpu's or RAM or both.

lscpu on my current ES nodes:

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                16
On-line CPU(s) list:   0-15
Thread(s) per core:    1
Core(s) per socket:    16
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 45
Model name:            Intel(R) Xeon(R) CPU E5-2650 0 @ 2.00GHz
Stepping:              7
CPU MHz:               1935.922
BogoMIPS:              3871.84
Hypervisor vendor:     Microsoft
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              20480K
NUMA node0 CPU(s):     0-15

free -g:

              total        used        free      shared  buff/cache   available
Mem:             31          25           0           0           5           5
Swap:             7           0           7

So I can see that my ES nodes appear to be maxing their RAM. Could this be a reason they are not indexing the doucments fast enough.

Marvel is reporting my index rate at 14000+/s

my logstash indexer nodes passing the data into the ES cluster do not appear to be having any issues with RAM or lag. it appears the issue is at ingestion into the ES cluster. Right now my ES cluster is 12 nodes with the HW configs above.

please set other values:
flush_size: 5000
idle_flush_time: 5

one more thing - please also add for output elasticsearch more workers ex. 2 or 4.
How many filters workers are you using?

below is my filter section. I had to post it in a separate post due to character limitations on this forum.

I will make the other recommended changes and let you know. Is there a formula to number of workers per number of cores on your hardware?

filter {
  if [type] == "BRO" {
    mutate {
      rename => { "_path" => "log_type" }
    }
    date {
      match => ["ts", "UNIX_MS"]
    }
    if [log_type] == "conn" {
      translate {
        field => "conn_state"
        destination => "conn_state_full"
        dictionary => [
          "S0", "Connection attempt seen, no reply",
          "S1", "Connection established, not terminated",
          "S2", "Connection established and close attempt by originator seen (but no reply from responder)",
          "S3", "Connection established and close attempt by responder seen (but no reply from originator)",
          "SF", "Normal SYN/FIN completion",
          "REJ", "Connection attempt rejected",
          "RSTO", "Connection established, originator aborted (sent a RST)",
          "RSTR", "Established, responder aborted",
          "RSTOS0", "Originator sent a SYN followed by a RST, we never saw a SYN-ACK from the responder",
          "RSTRH", "Responder sent a SYN ACK followed by a RST, we never saw a SYN from the (purported) originator",
          "SH", "Originator sent a SYN followed by a FIN, we never saw a SYN ACK from the responder (hence the connection was 'half' open)",
          "SHR", "Responder sent a SYN ACK followed by a FIN, we never saw a SYN from the originator",
          "OTH", "No SYN seen, just midstream traffic (a 'partial connection' that was not later closed)"
        ]
      }
      mutate {
        convert => { "src_ip" => "integer" }
        convert => { "src_port" => "integer" }
        convert => { "src_bytes" => "integer" }
        convert => { "resp_bytes" => "integer" }
        convert => { "missed_bytes" => "integer" }
        convert => { "src_pkts" => "integer" }
        convert => { "orig_ip_bytes" => "integer" }
        convert => { "resp_pkts" => "integer" }
        convert => { "resp_ip_bytes" => "integer" }
      }
    }
    mutate {
      rename => { "id.orig_h" => "src_ip" }
      rename => { "id.orig_p" => "src_port" }
      rename => { "id.resp_h" => "resp_ip" }
      rename => { "id.resp_p" => "resp_port" }
      rename => { "orig_bytes" => "src_bytes" }
      rename => { "orig_pkts" => "src_packets" }
      rename => { "orig_cc" => "src_cc" }
    }
    if [log_type] == "conn" {
      geoip {
        source => "src_ip"
        target => "geoip_src"
      }
      geoip {
        source => "resp_ip"
        target => "geoip_resp"
      }
    }
    else if [log_type] == "notice" {
      geoip {
        source => "src"
        target => "geoip_src"
      }
      geoip {
        source => "dst"
        target => "geoip_resp"
      }
    }
    if [log_type] == "ssl" {
      mutate {
        rename => { "version" => "ssl_version" }
      }
    }
    if [log_type] == "ssh" {
      mutate {
        rename => { "version" => "ssh_version" }
      }
    }
    if [log_type] in ["conn","notice","intel","dns","http_eth1","http_eth2","http_eth3","http_eth4","http_eth5","weird","ssl","ssh","syslog"] {
      if [src_ip] !~"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)|(^224\.0\.0\.)" {  ##Disable Private IP space from Lookup Source###
        mutate {
          add_field => { "src_Senderbase_lookup" => "http://www.senderbase.org/lookup/?search_string=%{src_ip}" }
          add_field => { "src_CBL_lookup" => "http://cbl.abuseat.org/lookup.cgi?ip=%{src_ip}" }
          add_field => { "src_Spamhaus_lookup" => "http://www.spamhaus.org/query/bl?ip=%{src_ip}" }
          add_field => { "src_DomainTools_lookup" => "http://whois.domaintools.com/%{src_ip}" }
        }
      }
      if [resp_ip] !~"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)|(^224\.0\.0\.)" {  ##Disable Private IP space from Lookup Source##
        mutate {
          add_field => { "resp_Senderbase_lookup" => "http://www.senderbase.org/lookup/?search_string=%{resp_ip}" }
          add_field => { "resp_CBL_lookup" => "http://cbl.abuseat.org/lookup.cgi?ip=%{resp_ip}" }
          add_field => { "resp_Spamhaus_lookup" => "http://www.spamhaus.org/query/bl?ip=%{resp_ip}" }
          add_field => { "resp_DomainTools_lookup" => "http://whois.domaintools.com/%{resp_ip}" }
        }
      }
    }
    if [log_type] in ["http_eth1","http_eth2","http_eth3","http_eth4","http_eth5"] {
      mutate {
        add_field => {"url_full" => "%{host}%{uri}"}
      }
    }
  }
}

I bet those filters take a lot of CPU. Try adjusting your workers (-w). Best way to tune it, turn off all the logstash nodes except one. Turn up the -w value until you appear to max out your CPU for the node. Note your throughput (b/c we want to know :wink:) and then apply the same -w to the rest of the nodes. Turn them on and see if it can keep up.

Check out Upgrading to Logstash 2.2 especially the Performance Tuning in 2.2 section:

Since both filters and output workers are on the same thread, this could lead to threads being idle in I/O wait state. Thus, in 2.2, you can safely set -w to a number which is a multiple of the number of cores on your machine. A common way to tune performance is keep increasing the -w beyond the # of cores until performance no longer improves.

Thanks. It appears the logstash nodes are keeping up, but the elasticsearch data nodes are having some trouble. My Data nodes are:

lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 1
Core(s) per socket: 16
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2699 v3 @ 2.30GHz
Stepping: 2
CPU MHz: 2241.945
BogoMIPS: 4483.89
Hypervisor vendor: Microsoft
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 46080K
NUMA node0 CPU(s): 0-15

There is 32 gig of ram with 24 set for java.

I am noticing (using marvel) that my data nodes are hitting max java heap every 20 minutes so my JVM Heap diagram show my percentage go to 80 then zero and slowly crawl back to 80 so my graph is a lot of peaks every 20ish minutes. I'm thinking that it is a data node issue but I'm not sure how to diagnose it further.

I'm looking at the link you sent now

For reference what are the hardware specs of the Logstash nodes and what -w did you settle on?

As for es tuning, you should open a new thread in the Elasticsearch section.

Thank you. I am going to open another ticket in elasticsearch for tuning, this got a bit off topic, I agree.

My logstash nodes handling messages from kafka, performing filters and indexing them into elasticsearch are as follows:

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                8
On-line CPU(s) list:   0-7
Thread(s) per core:    1
Core(s) per socket:    8
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 45
Model name:            Intel(R) Xeon(R) CPU E5-2650 0 @ 2.00GHz
Stepping:              7
CPU MHz:               2004.116
BogoMIPS:              4008.23
Hypervisor vendor:     Microsoft
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              20480K
NUMA node0 CPU(s):     0-7

32 gig of RAM with 24 set for JAVA_HEAP

If I check RAM usage on these 4 nodes it is always just 1G used with 29G free. So it doesn't seem that there is much of an issue with these nodes being overwhelmed. Since changing from Redis to Kafka I have noticed an error in my logstash.log:

"status"=>500, "error"=>{"type"=>"timeout_exception", "reason"=>"Failed to acknowledge mapping update within [30s]"}}}, :level=>:warn}

I am going to dive into that this morning. I am using dynamic templates so I am thinking of trying a defined template to see if that assists me in my ingestion rate.

From my testing it appears Kafka and the Kafka plugin are working like champs. Kafka is reading bro logs from my sensor at line rate on a 4.4 gig network, it just seems that elasticsearch can't keep up becasue of some setting somewhere.

I tried to spin up additional logstash node reading from my kafka cluster but when I start logstash I get the following error:

kafka client threw exception, restarting {:exception=>java.net.UnknownHostException: OP-01-VM-5524: OP-01-VM-5524: unknown error, :level=>:warn}

And it just repeats the error over and over and over.

The domain listed in the error is not my kafka node it is the node I am running logstash on trying to read from kafka

Does the second instance have a sub domain that it should from? Make sure it can ping that host.

The logstash node can ping the Kafka cluster. There is no software or hardware firewall in between. The strange part is I rebooted one of my 4 original logstash nodes and now it is getting the same error, with its hostname in the error.

Not sure why. The only thing I did between restarting logstash was add workers to the outputs and threads to the inputs.

I am getting the same error and have not found a solution. Were you able to resolve this issue?

Sorry for the delay. I did get this resolved. It was a DNS issue. I had to make sure that all machines had DNS entries entered correctly after that Kafka worked as expected