Output only part of the message field using CSV filter

Hi all,

I am trying to index only certain columns from a Zeek(Bro) log. I managed to send all the columns within a message to ES and Kibana.

here is an example line of the log file:

1561640400.525924 CGtlxS2Yk3ztKrfMK3 10.10.0.55 34218 10.10.0.2 53 udp dns 0.008105 0 114 SHRCd 0 0 1 142 -

and here is my logstash .conf file:

input {
  beats {
  port => 5044
  }
}
filter {
  if [message] =~ /^#/ {
    drop { }
  }
  if [type] == "bro-conn" {
    csv {
      columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","proto","service","duration","orig_bytes","resp_bytes","conn_state","local_orig","local_resp","missed_bytes","history","orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes","tunnel_parents"]
      separator => "  "
    }
date {
  match => [ "ts", "UNIX" ]
}

 geoip {
   source => "id.resp_h"
}

mutate {
  convert => { "id.orig_p" => "integer" }
  convert => { "id.resp_p" => "integer" }
  convert => { "orig_bytes" => "integer" }
  convert => { "duration" => "float" }
  convert => { "resp_bytes" => "integer" }
  convert => { "missed_bytes" => "integer" }
  convert => { "orig_pkts" => "integer" }
  convert => { "orig_ip_bytes" => "integer" }
  convert => { "resp_pkts" => "integer" }
  convert => { "resp_ip_bytes" => "integer" }
}
}


if [type] == "bro-ssh" {

csv {
  columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","version","auth_success","auth_attempts","direction","client","server","cipher_alg","mac_alg","compression_alg","kex_alg","host_key_alg","host_key","remote_location.country_code","remote_location.region","remote_loation.city","remote_location.latitude","remote_location.longitude"]
  separator => "    "
}

    date {
      match => [ "ts", "UNIX" ]
    }

   geoip {
       source => "id.resp_h"
    }

    mutate {
      convert => { "id.orig_p" => "integer" }
      convert => { "id.resp_p" => "integer" }
      convert => { "auth_attempts" => "integer" }
    }
  }
}

output {

  if [fields][type] == "bro-conn" {

    elasticsearch {
      hosts => ["elasticsearch:9200"]
      manage_template => false
      index => "%{[fields][type]}-%{+YYYY.MM.dd}"
      document_type => "bro"

    }
  }

  if [fields][type] == "bro-ssh" {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      manage_template => false
      index => "%{[fields][type]}-%{+YYYY.MM.dd}"
      document_type => "bro"
    }
  }
}


output {

  stdout{}
}

This indexes everything fine and I can see it on Kibana. However I do not want all of the columns from the CSV section in the message field when outputting to ES/Kibana.

I tried prune filter but I think thats more to remove whitelist/blacklist an entire field such as message? and not the content of the message field?

I also tried mutate gsub as seen here but not sure how I can do that with csv filter.

I am new to ELK so any help would be nice.

Kind regards,
Merril.

You can overwrite the message field with a subset of the columns from the csv. For example

mutate { replace => { "message" => "%{id.orig_h},%{id.orig_p}" } }

I tried your solution and it doesnt seem to change the message. :frowning:

input {
 beats {
 port => 5044
}
}
filter {
 if [message] =~ /^#/ {
   drop { }
}
if [type] == "bro-conn" {
  csv {
    columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","proto","service","duration","orig_bytes","resp_bytes","conn_state","local_orig","local_resp","missed_bytes","history","orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes","tunnel_parents"]
separator => "  "
}
date {
 match => [ "ts", "UNIX" ]
}

geoip {
 source => "id.resp_h"
}

 mutate {
  replace => {"message" => "%{id.orig_h},%{id.resp_h}"}
   }
mutate {
convert => { "id.orig_p" => "integer" }
convert => { "id.resp_p" => "integer" }
convert => { "orig_bytes" => "integer" }
convert => { "duration" => "float" }
convert => { "resp_bytes" => "integer" }
convert => { "missed_bytes" => "integer" }
convert => { "orig_pkts" => "integer" }
convert => { "orig_ip_bytes" => "integer" }
convert => { "resp_pkts" => "integer" }
convert => { "resp_ip_bytes" => "integer" }
}
}

if [type] == "bro-ssh" {

csv {
  columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","version","auth_success","auth_attempts","direction","client","server","cipher_alg","mac_alg","compression_alg","kex_alg","host_key_alg","host_key","remote_location.country_code","remote_location.region","remote_loation.city","remote_location.latitude","remote_location.longitude"]
separator => "    "

}

    date {
      match => [ "ts", "UNIX" ]
    }
geoip {
   source => "id.resp_h"
}
 mutate {
  replace => {"message" => "%{id.orig_h},%{id.resp_h}"}
}

mutate {
  convert => { "id.orig_p" => "integer" }
  convert => { "id.resp_p" => "integer" }
  convert => { "auth_attempts" => "integer" }
 # Also tried this
 replace => {"message" => "%{id.orig_h},%{id.resp_h}"}
}
}
}
output {

if [fields][type] == "bro-conn" {

elasticsearch {
  hosts => ["elasticsearch:9200"]
  manage_template => false
  index => "%{[fields][type]}-%{+YYYY.MM.dd}"
  document_type => "bro"

}
}

  if [fields][type] == "bro-ssh" {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      manage_template => false
      index => "%{[fields][type]}-%{+YYYY.MM.dd}"
      document_type => "bro"
    }
  }
}


output {

  stdout{}
}

I am still getting the full message as output "message" => "1561650131.185809\tCEyV6e48GpcqfcqT7h\t10.10.0.55\t42164\t3.8.152.103\t5044\ttcp\t-\t-\t-\t-\tOTH\tT\tF\t0\tC\t0\t0\t0\t

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.