Logstash Kafka Input Plugin Data has Type Already

I'm using the Kafka Input plugin and noticed that my elasticsearch indices were being named incorrectly. After debugging, I realized that the kafka topic had documents that already included a type field.

Input.conf

input {
  kafka {
    type => "kafka"
    topic_id => "topic"
    zk_connect => "zk1, zk2, zk3"
  }
}

Data coming in may look something like:

{"date_created":"2016-08-03 16:19:18",
"unix_time":1470266358,
"date_short":"2016-08-03",
"local_day_of_week":"Wed",
"local_time":"1619",
"type":2,
"event_type":"topic"}

So, if you notice that type field has a value of 2. When I look on http://elasticsearch/_cat/indices it just has green open 2-2016.08.03 instead of topic-2016.08.03.

Are there any ways around this?

Can you share the Elasticsearch output config?

Hey Joe,

Here's a (hostname modified) version of my output config.

output {
    elasticsearch {
      hosts   => [ "elasticsearchserver" ]
      index   => "%{type}-%{+yyyy.MM.dd}"
      workers => 1
    }
}

Thanks!

If you cannot change the message from having a "type" field already I would just try using another field ie https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-add_field

input {
  kafka {
    add_field => {
      "log_origin" => "kafka" 
    }
    ...
}
output {
    elasticsearch {
      hosts   => [ "elasticsearchserver" ]
      index   => "%{log_origin}-%{+yyyy.MM.dd}"
      workers => 1
    }
}

Thanks Joe! I think this will work for my use case.

Kingston

Hi Joe,

I have a same use case like my message format as follows from kafka output is ,

"rowid":0,"sentinel_id":4413,"cust_id":1,"devi_id":5309,"first_occurance":"2017-02-13 05:30:14","type":6,"action":25,"protocol":null,"src":3158395560,"dst":2094651427,"src_name":null,"dst_name":null,"dst_port":110,"user":"Edward","user_group":"HR","policy_name":null,"misc_1":null,"misc_2":"port3Edward@example.com","misc_3":"port2Edward@exampls.com","misc_8":null,"misc_9":null,"rawlog":"<723>Feb 13 05:30:13 10.2.1.224 date=2017-02-13 time=05:30:13 devname=FGCorp001 device_id=FGT8004271490115 log_id=050926726 type=emailfilter subtype=POP3 pri=emergency fwver=040004 policyid=78 serial=19286623 user="Edward" group="HR" vd="root" src=188.65.74.168 sport=110 src_port=110 src_int="port3" dst=124.217.216.35 dport=110 dst_port=110 dst_int"port2" service="110/pop3" carrier_ep="EndPoint" profile="profile" status="blocked" from="port3Edward@example.com" to="port2Edward@exampls.com" tracker="Tracker" msg="from email address is in email blacklist."","rawlog_hash":8789335990287780976,"evtcount":1,"inbytes":0,"outbytes":0,"totalbytes":0}

How can i check the condition for specific field ilike ,

if([dst]==124.217.216.35){
}
condition,For my case this one is not giving expected output