kingston  
                (Kingston Lee)
               
                 
              
                  
                    August 3, 2016, 11:28pm
                   
                   
              1 
               
             
            
              I'm using the Kafka Input plugin and noticed that my elasticsearch indices were being named incorrectly.  After debugging, I realized that the kafka topic had documents that already included a type field.
Input.conf
input {
  kafka {
    type => "kafka"
    topic_id => "topic"
    zk_connect => "zk1, zk2, zk3"
  }
}
 
Data coming in may look something like:
{"date_created":"2016-08-03 16:19:18",
"unix_time":1470266358,
"date_short":"2016-08-03",
"local_day_of_week":"Wed",
"local_time":"1619",
"type":2,
"event_type":"topic"}
 
So, if you notice that type  field has a value of 2 .  When I look on http://elasticsearch/_cat/indices    it just has green open 2-2016.08.03  instead of topic-2016.08.03 .
Are there any ways around this?
             
            
               
               
               
            
            
           
          
            
            
              Can you share the Elasticsearch output config?
             
            
               
               
               
            
            
           
          
            
              
                kingston  
                (Kingston Lee)
               
              
                  
                    August 4, 2016,  5:25pm
                   
                   
              3 
               
             
            
              Hey Joe,
Here's a (hostname modified) version of my output config.
output {
    elasticsearch {
      hosts   => [ "elasticsearchserver" ]
      index   => "%{type}-%{+yyyy.MM.dd}"
      workers => 1
    }
}
 
Thanks!
             
            
               
               
               
            
            
           
          
            
            
              If you cannot change the message from having a "type" field already I would just try using another field ie https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-add_field 
input {
  kafka {
    add_field => {
      "log_origin" => "kafka" 
    }
    ...
}
 
output {
    elasticsearch {
      hosts   => [ "elasticsearchserver" ]
      index   => "%{log_origin}-%{+yyyy.MM.dd}"
      workers => 1
    }
}
 
             
            
               
               
               
            
            
           
          
            
              
                kingston  
                (Kingston Lee)
               
              
                  
                    August 4, 2016,  9:02pm
                   
                   
              5 
               
             
            
              Thanks Joe! I think this will work for my use case.
Kingston
             
            
               
               
               
            
            
           
          
            
              
                moni15moni  
                (mohankumar)
               
              
                  
                    February 14, 2017,  6:01am
                   
                   
              6 
               
             
            
              Hi Joe,
I have a same use case like my message format as follows from kafka output is ,
"rowid":0,"sentinel_id":4413,"cust_id":1,"devi_id":5309,"first_occurance":"2017-02-13 05:30:14","type":6,"action":25,"protocol":null,"src":3158395560,"dst":2094651427,"src_name":null,"dst_name":null,"dst_port":110,"user":"Edward","user_group":"HR","policy_name":null,"misc_1":null,"misc_2":"port3Edward@example.com ","misc_3":"port2Edward@exampls.com ","misc_8":null,"misc_9":null,"rawlog":"<723>Feb 13 05:30:13 10.2.1.224 date=2017-02-13 time=05:30:13 devname=FGCorp001 device_id=FGT8004271490115 log_id=050926726 type=emailfilter subtype=POP3 pri=emergency fwver=040004 policyid=78 serial=19286623 user="Edward" group="HR" vd="root" src=188.65.74.168 sport=110 src_port=110 src_int="port3" dst=124.217.216.35 dport=110 dst_port=110 dst_int"port2" service="110/pop3" carrier_ep="EndPoint" profile="profile"  status="blocked" from="port3Edward@example.com " to="port2Edward@exampls.com " tracker="Tracker" msg="from email address is in email blacklist."","rawlog_hash":8789335990287780976,"evtcount":1,"inbytes":0,"outbytes":0,"totalbytes":0}
How can i check the condition for specific field ilike ,
if([dst]==124.217.216.35){ 
} 
condition,For my case this one is not giving expected output