kingston
(Kingston Lee)
August 3, 2016, 11:28pm
1
I'm using the Kafka Input plugin and noticed that my elasticsearch indices were being named incorrectly. After debugging, I realized that the kafka topic had documents that already included a type field.
Input.conf
input {
kafka {
type => "kafka"
topic_id => "topic"
zk_connect => "zk1, zk2, zk3"
}
}
Data coming in may look something like:
{"date_created":"2016-08-03 16:19:18",
"unix_time":1470266358,
"date_short":"2016-08-03",
"local_day_of_week":"Wed",
"local_time":"1619",
"type":2,
"event_type":"topic"}
So, if you notice that type field has a value of 2 . When I look on http://elasticsearch/_cat/indices it just has green open 2-2016.08.03 instead of topic-2016.08.03 .
Are there any ways around this?
Can you share the Elasticsearch output config?
kingston
(Kingston Lee)
August 4, 2016, 5:25pm
3
Hey Joe,
Here's a (hostname modified) version of my output config.
output {
elasticsearch {
hosts => [ "elasticsearchserver" ]
index => "%{type}-%{+yyyy.MM.dd}"
workers => 1
}
}
Thanks!
If you cannot change the message from having a "type" field already I would just try using another field ie https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-add_field
input {
kafka {
add_field => {
"log_origin" => "kafka"
}
...
}
output {
elasticsearch {
hosts => [ "elasticsearchserver" ]
index => "%{log_origin}-%{+yyyy.MM.dd}"
workers => 1
}
}
kingston
(Kingston Lee)
August 4, 2016, 9:02pm
5
Thanks Joe! I think this will work for my use case.
Kingston
moni15moni
(mohankumar)
February 14, 2017, 6:01am
6
Hi Joe,
I have a same use case like my message format as follows from kafka output is ,
"rowid":0,"sentinel_id":4413,"cust_id":1,"devi_id":5309,"first_occurance":"2017-02-13 05:30:14","type":6,"action":25,"protocol":null,"src":3158395560,"dst":2094651427,"src_name":null,"dst_name":null,"dst_port":110,"user":"Edward","user_group":"HR","policy_name":null,"misc_1":null,"misc_2":"port3Edward@example.com ","misc_3":"port2Edward@exampls.com ","misc_8":null,"misc_9":null,"rawlog":"<723>Feb 13 05:30:13 10.2.1.224 date=2017-02-13 time=05:30:13 devname=FGCorp001 device_id=FGT8004271490115 log_id=050926726 type=emailfilter subtype=POP3 pri=emergency fwver=040004 policyid=78 serial=19286623 user="Edward" group="HR" vd="root" src=188.65.74.168 sport=110 src_port=110 src_int="port3" dst=124.217.216.35 dport=110 dst_port=110 dst_int"port2" service="110/pop3" carrier_ep="EndPoint" profile="profile" status="blocked" from="port3Edward@example.com " to="port2Edward@exampls.com " tracker="Tracker" msg="from email address is in email blacklist."","rawlog_hash":8789335990287780976,"evtcount":1,"inbytes":0,"outbytes":0,"totalbytes":0}
How can i check the condition for specific field ilike ,
if([dst]==124.217.216.35){
}
condition,For my case this one is not giving expected output