Hi all,
I am trying to index only certain columns from a Zeek(Bro) log. I managed to send all the columns within a message to ES and Kibana.
here is an example line of the log file:
1561640400.525924 CGtlxS2Yk3ztKrfMK3 10.10.0.55 34218 10.10.0.2 53 udp dns 0.008105 0 114 SHRCd 0 0 1 142 -
and here is my logstash .conf file:
input {
beats {
port => 5044
}
}
filter {
if [message] =~ /^#/ {
drop { }
}
if [type] == "bro-conn" {
csv {
columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","proto","service","duration","orig_bytes","resp_bytes","conn_state","local_orig","local_resp","missed_bytes","history","orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes","tunnel_parents"]
separator => " "
}
date {
match => [ "ts", "UNIX" ]
}
geoip {
source => "id.resp_h"
}
mutate {
convert => { "id.orig_p" => "integer" }
convert => { "id.resp_p" => "integer" }
convert => { "orig_bytes" => "integer" }
convert => { "duration" => "float" }
convert => { "resp_bytes" => "integer" }
convert => { "missed_bytes" => "integer" }
convert => { "orig_pkts" => "integer" }
convert => { "orig_ip_bytes" => "integer" }
convert => { "resp_pkts" => "integer" }
convert => { "resp_ip_bytes" => "integer" }
}
}
if [type] == "bro-ssh" {
csv {
columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","version","auth_success","auth_attempts","direction","client","server","cipher_alg","mac_alg","compression_alg","kex_alg","host_key_alg","host_key","remote_location.country_code","remote_location.region","remote_loation.city","remote_location.latitude","remote_location.longitude"]
separator => " "
}
date {
match => [ "ts", "UNIX" ]
}
geoip {
source => "id.resp_h"
}
mutate {
convert => { "id.orig_p" => "integer" }
convert => { "id.resp_p" => "integer" }
convert => { "auth_attempts" => "integer" }
}
}
}
output {
if [fields][type] == "bro-conn" {
elasticsearch {
hosts => ["elasticsearch:9200"]
manage_template => false
index => "%{[fields][type]}-%{+YYYY.MM.dd}"
document_type => "bro"
}
}
if [fields][type] == "bro-ssh" {
elasticsearch {
hosts => ["elasticsearch:9200"]
manage_template => false
index => "%{[fields][type]}-%{+YYYY.MM.dd}"
document_type => "bro"
}
}
}
output {
stdout{}
}
This indexes everything fine and I can see it on Kibana. However I do not want all of the columns from the CSV section in the message field when outputting to ES/Kibana.
I tried prune filter but I think thats more to remove whitelist/blacklist an entire field such as message? and not the content of the message field?
I also tried mutate gsub as seen here but not sure how I can do that with csv filter.
I am new to ELK so any help would be nice.
Kind regards,
Merril.