I'm new to Kafka and Logstash, but have done a fair amount of studying and have encountered a series of errors which I can't figure out and have no idea how to fix.
... had to cut because character limit
pipeline.RUBY$block$start_input$1(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:517) [?:?]
rock-logstash | at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:145) [?:?]
rock-logstash | at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:71) [?:?]
rock-logstash | at org.jruby.runtime.Block.call(Block.java:124) [?:?]
rock-logstash | at org.jruby.RubyProc.call(RubyProc.java:289) [?:?]
rock-logstash | at org.jruby.RubyProc.call(RubyProc.java:246) [?:?]
rock-logstash | at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:104) [?:?]
rock-logstash | at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
rock-logstash | [2018-01-19T19:27:49,672][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=suricata_logstash] Discovered coordinator fef92fe6a9d4:9092 (id: 2147483647 rack: null)
rock-logstash | [2018-01-19T19:27:49,672][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=suricata_logstash] Marking the coordinator fef92fe6a9d4:9092 (id: 2147483647 rack: null) dead
rock-logstash | [2018-01-19T19:27:49,731][INFO ][logstash.agent ] Pipelines running {:count=>3, :pipelines=>["bro", "fsf", "suricata"]}
Kafka Log:
had to cut...
rock-kafka | [2018-01-19 19:21:13,456] INFO [Partition __consumer_offsets-41 broker=0] __consumer_offsets-41 starts at Leader Epoch 4 from offset 0. Previous Leader Epoch was: 3 (kafka.cluster.Partition)
rock-kafka | [2018-01-19 19:21:13,456] INFO [Partition __consumer_offsets-32 broker=0] __consumer_offsets-32 starts at Leader Epoch 4 from offset 0. Previous Leader Epoch was: 3 (kafka.cluster.Partition)
rock-kafka | [2018-01-19 19:21:13,456] INFO [Partition __consumer_offsets-3 broker=0] __consumer_offsets-3 starts at Leader Epoch 4 from offset 0. Previous Leader Epoch was: 3 (kafka.cluster.Partition)
rock-kafka | [2018-01-19 19:21:13,456] INFO [Partition __consumer_offsets-13 broker=0] __consumer_offsets-13 starts at Leader Epoch 4 from offset 0. Previous Leader Epoch was: 3 (kafka.cluster.Partition)
rock-kafka | [2018-01-19 19:21:24,124] INFO Updated PartitionLeaderEpoch. New: {epoch:17, offset:18942}, Current: {epoch:16, offset6547} for Partition: suricata-raw-0. Cache now contains 2 entries. (kafka.server.epoch.LeaderEpochFileCache)
rock-kafka | [2018-01-19 19:31:12,579] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
Zookeepeer Log
Just discovered these super helpful error messages ;-p (I'm sure they are helpful to someone - it isn't me hahahaha)
Got user-level KeeperException when processing sessionid:0x10000b5e7ff0000 type:create cxid:0x6f zxid:0x2b1 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers
Here is one of my three identical logstash configs (the rest are the same just for bro and fsf)
input {
kafka {
topics => ["suricata-raw"]
add_field => { "[@metadata][stage]" => "suricataraw_kafka" }
# Set this to one per kafka partition to scale up
#consumer_threads => 4
group_id => "suricata_logstash"
bootstrap_servers => "rocksensor1.lan:9092"
codec => json
auto_offset_reset => "earliest"
}
}
filter {
if "_jsonparsefailure" in [tags] {
drop { }
}
# Remove kafka_topic field
mutate {
remove_field => [ "kafka_topic" ]
}
if [@metadata][stage] == "suricataraw_kafka" {
# Set the timestamp
date { match => [ "timestamp", "ISO8601" ] }
}
}
output {
if [@metadata][stage] == "suricataraw_kafka" {
kafka {
codec => json
topic_id => "suricata-clean"
bootstrap_servers => "rocksensor1.lan:9092"
}
elasticsearch {
hosts => ["rockserver1.lan"]
index => "suricata-%{+YYYY.MM.dd}"
manage_template => false
document_type => "%{event_type}"
}
}
}