Hi all,
I am trying to index only certain columns from a Zeek(Bro) log. I managed to send all the columns within a message to ES and Kibana.
here is an example line of the log file:
1561640400.525924	CGtlxS2Yk3ztKrfMK3	10.10.0.55	34218	10.10.0.2	53	udp	dns	0.008105	0	114	SHRCd	0	0	1	142	-
and here is my logstash .conf file:
input {
  beats {
  port => 5044
  }
}
filter {
  if [message] =~ /^#/ {
    drop { }
  }
  if [type] == "bro-conn" {
    csv {
      columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","proto","service","duration","orig_bytes","resp_bytes","conn_state","local_orig","local_resp","missed_bytes","history","orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes","tunnel_parents"]
      separator => "  "
    }
date {
  match => [ "ts", "UNIX" ]
}
 geoip {
   source => "id.resp_h"
}
mutate {
  convert => { "id.orig_p" => "integer" }
  convert => { "id.resp_p" => "integer" }
  convert => { "orig_bytes" => "integer" }
  convert => { "duration" => "float" }
  convert => { "resp_bytes" => "integer" }
  convert => { "missed_bytes" => "integer" }
  convert => { "orig_pkts" => "integer" }
  convert => { "orig_ip_bytes" => "integer" }
  convert => { "resp_pkts" => "integer" }
  convert => { "resp_ip_bytes" => "integer" }
}
}
if [type] == "bro-ssh" {
csv {
  columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","version","auth_success","auth_attempts","direction","client","server","cipher_alg","mac_alg","compression_alg","kex_alg","host_key_alg","host_key","remote_location.country_code","remote_location.region","remote_loation.city","remote_location.latitude","remote_location.longitude"]
  separator => "    "
}
    date {
      match => [ "ts", "UNIX" ]
    }
   geoip {
       source => "id.resp_h"
    }
    mutate {
      convert => { "id.orig_p" => "integer" }
      convert => { "id.resp_p" => "integer" }
      convert => { "auth_attempts" => "integer" }
    }
  }
}
output {
  if [fields][type] == "bro-conn" {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      manage_template => false
      index => "%{[fields][type]}-%{+YYYY.MM.dd}"
      document_type => "bro"
    }
  }
  if [fields][type] == "bro-ssh" {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      manage_template => false
      index => "%{[fields][type]}-%{+YYYY.MM.dd}"
      document_type => "bro"
    }
  }
}
output {
  stdout{}
}
This indexes everything fine and I can see it on Kibana. However I do not want all of the columns from the CSV section in the message field when outputting to ES/Kibana.
I tried prune filter but I think thats more to remove whitelist/blacklist an entire field such as message? and not the content of the message field?
I also tried mutate gsub as seen here but not sure how I can do that with csv filter.
I am new to ELK so any help would be nice.
Kind regards,
Merril.
