Kafka Logstash Plugin Not Working 6.12

I'm new to Kafka and Logstash, but have done a fair amount of studying and have encountered a series of errors which I can't figure out and have no idea how to fix.

... had to cut because character limit

pipeline.RUBY$block$start_input$1(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:517) [?:?]
    rock-logstash |         at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:145) [?:?]
    rock-logstash |         at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:71) [?:?]
    rock-logstash |         at org.jruby.runtime.Block.call(Block.java:124) [?:?]
    rock-logstash |         at org.jruby.RubyProc.call(RubyProc.java:289) [?:?]
    rock-logstash |         at org.jruby.RubyProc.call(RubyProc.java:246) [?:?]
    rock-logstash |         at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:104) [?:?]
    rock-logstash |         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
    rock-logstash | [2018-01-19T19:27:49,672][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=suricata_logstash] Discovered coordinator fef92fe6a9d4:9092 (id: 2147483647 rack: null)
    rock-logstash | [2018-01-19T19:27:49,672][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=suricata_logstash] Marking the coordinator fef92fe6a9d4:9092 (id: 2147483647 rack: null) dead
    rock-logstash | [2018-01-19T19:27:49,731][INFO ][logstash.agent           ] Pipelines running {:count=>3, :pipelines=>["bro", "fsf", "suricata"]}

Kafka Log:

had to cut...

rock-kafka | [2018-01-19 19:21:13,456] INFO [Partition __consumer_offsets-41 broker=0] __consumer_offsets-41 starts at Leader Epoch 4 from offset 0. Previous Leader Epoch was: 3 (kafka.cluster.Partition)
rock-kafka | [2018-01-19 19:21:13,456] INFO [Partition __consumer_offsets-32 broker=0] __consumer_offsets-32 starts at Leader Epoch 4 from offset 0. Previous Leader Epoch was: 3 (kafka.cluster.Partition)
rock-kafka | [2018-01-19 19:21:13,456] INFO [Partition __consumer_offsets-3 broker=0] __consumer_offsets-3 starts at Leader Epoch 4 from offset 0. Previous Leader Epoch was: 3 (kafka.cluster.Partition)
rock-kafka | [2018-01-19 19:21:13,456] INFO [Partition __consumer_offsets-13 broker=0] __consumer_offsets-13 starts at Leader Epoch 4 from offset 0. Previous Leader Epoch was: 3 (kafka.cluster.Partition)
rock-kafka | [2018-01-19 19:21:24,124] INFO Updated PartitionLeaderEpoch. New: {epoch:17, offset:18942}, Current: {epoch:16, offset6547} for Partition: suricata-raw-0. Cache now contains 2 entries. (kafka.server.epoch.LeaderEpochFileCache)
rock-kafka | [2018-01-19 19:31:12,579] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

Zookeepeer Log

Just discovered these super helpful error messages ;-p (I'm sure they are helpful to someone - it isn't me hahahaha)

Got user-level KeeperException when processing sessionid:0x10000b5e7ff0000 type:create cxid:0x6f zxid:0x2b1 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers

Here is one of my three identical logstash configs (the rest are the same just for bro and fsf)

input {
  kafka {
    topics => ["suricata-raw"]
    add_field => { "[@metadata][stage]" => "suricataraw_kafka" }
    # Set this to one per kafka partition to scale up
    #consumer_threads => 4
    group_id => "suricata_logstash"
    bootstrap_servers => "rocksensor1.lan:9092"
    codec => json
    auto_offset_reset => "earliest"
  }
}

filter {
  if "_jsonparsefailure" in [tags] {
    drop { }
  }

  # Remove kafka_topic field
  mutate {
    remove_field => [ "kafka_topic" ]
  }

  if [@metadata][stage] == "suricataraw_kafka" {

    # Set the timestamp
    date { match => [ "timestamp", "ISO8601" ] }
    }
}

output {
  if [@metadata][stage] == "suricataraw_kafka" {
    kafka {
     codec => json
     topic_id => "suricata-clean"
     bootstrap_servers => "rocksensor1.lan:9092"
    }

    elasticsearch {
      hosts => ["rockserver1.lan"]
      index => "suricata-%{+YYYY.MM.dd}"
      manage_template => false
      document_type => "%{event_type}"
    }
  }
}

I figured it out

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.