I create this filter
filter {
# Filter only CEF logs here
if "syslog" in [tags]{
# Manipulate the message
mutate {
# Saved the original message into a temporary field
add_field => { "tmp_message" => "%{message}" }
# splits message on the "|" and has index numbers
split => ["message", "|"]
# SAMPLE
# generate fields for the CEF header
add_field => { "[cef][version]" => "%{[message][0]}" }
add_field => { "[cef][device][vendor]" => "%{[message][1]}" }
add_field => { "[cef][device][product]" => "%{[message][2]}" }
add_field => { "[cef][device][version]" => "%{[message][3]}" }
add_field => { "[cef][device][event_class_id]" => "%{[message][4]}" }
add_field => { "[cef][name]" => "%{[message][5]}" }
add_field => { "[cef][severity]" => "%{[message][6]}" }
add_tag => [ "CEF-Firewall" ]
}
# Parse the message with field=value formats for Firewall
kv {
# Note: values with spaces are lost (still getting there)
field_split => " "
trim_key => "<>\[\], "
trim_value => "<>\[\],"
# Only included the fields which are of interest (dont need everything)
allow_duplicate_values => false
include_keys => ["act","rt","spt","dpt","match_id","rule_action","ifnam e","dst","inzone","outzone","product","proto","service_id","src","duser","suser" ,"shost","dhost"]
}
prune {
whitelist_values => [ "match_id", "^[0-9]{3}$" ]
}
mutate {
# Rename fields to cef_field_names
rename => [ "act", "[event][action]"]
rename => [ "rt", "[cef][time]"]
rename => [ "spt", "[cef][source][port]"]
rename => [ "dpt", "[cef][destination][port]"]
rename => [ "match_id","[cef][rule][number]"]
rename => [ "rule_action","[cef][rule][action]"]
rename => [ "dst", "[cef][destination][geoip][ip]"]
rename => [ "ifname","[cef][interface]"]
rename => [ "inzone","[cef][source][zone]"]
rename => [ "outzone","[cef][destination][zone]"]
rename => [ "product","[cef][device][product2]"]
rename => [ "proto", "[cef][network][transport]"]
rename => [ "service_id", "[cef][service]"]
rename => [ "src", "[cef][source][geoip][ip]"]
rename => [ "suser", "[cef][source][user]"]
rename => [ "duser", "[cef][destination][user]"]
rename => [ "shost", "[cef][source][host]"]
rename => [ "dhost", "[cef][destination][host]"]
# Revert original message and remove temporary field
replace => { "message" => "%{tmp_message}" }
remove_field => [ "tmp_message" ]
}
geoip {
source => "[cef][source][geoip][ip]"
target => "[cef][source][geoip][location]"
}
geoip {
source => "[cef][destination][geoip][ip]"
target => "[cef][destination][geoip][location]"
}
date {
match => ["[cef][time]","UNIX_MS"]
remove_field => [ "[cef][time]" ]
}
}
}
I have problem with geoip: the processor is applied but the I don't found in my index the field geo_point.
The output is the following
output {
if "syslog" in [tags]{
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "cef-%{+YYYY.MM.dd}"
# index => "logstash-%{+YYYY.MM.dd}"
codec => "plain"
workers => 1
manage_template => true
template_name => "cef"
# template_name => "logstash"
template_overwrite => false
ssl => true
cacert => "/tmp/certs/root-ca.crt"
ssl_certificate_verification => true
user => admin
password => "xxxxxxxx"
}
stdout { codec => rubydebug }
}
}
I tried to used logstash instead of cef-* but I have the same result with geoip.ip filed and geoip.location field but the data of all row of index remaining without geoip. I'd like to improve this data with geo_ip in order to use maps app of elastic....
Thank you
Franco