Processing bro output logs (from pcaps) with logstash

I'm trying to get bro output processed with logstash to elasticsearch, but for some reason I can't get it to work. Am I missing something?

Below is some sample output that I'm trying to process. I got this from a pcap file using bro, after changing some settings so it automatically outputs logs in this json format.

{"ts":"2017-03-23T09:07:09.539617Z","uid":"C9jTJj4ooYZ32LlG1k","id.orig_h":"xxx.xxx.xxx.xx","id.orig_p":52440,"id.resp_h":"xx.xxx.xxx.xxx","id.resp_p":80,"trans_depth":1,"method":"GET","host":"www.test.com","uri":"/","version":"1.1","user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0","request_body_len":0,"response_body_len":142565,"status_code":200,"status_msg":"OK","tags":[],"resp_fuids":["FoB6Hh2xeEtKGfOZSh"],"resp_mime_types":["text/html"]}
{"ts":"2017-03-23T09:07:10.613164Z","uid":"CLN68d2HXrKwfY2D7a","id.orig_h":"xxx.xxx.xxx.xx","id.orig_p":52441,"id.resp_h":"xxx.xxx.xxx.xxx","id.resp_p":80,"trans_depth":1,"method":"GET","host":"cdn2.test.com","uri":"/images/default/logo.svg","referrer":"http://www.test.com/","version":"1.1","user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0","request_body_len":0,"response_body_len":12179,"status_code":200,"status_msg":"OK","tags":[],"resp_fuids":["FO3xZlIqETcEaboTc"],"resp_mime_types":["text/plain"]}
{"ts":"2017-03-23T09:07:10.629316Z","uid":"CLN68d2HXrKwfY2D7a","id.orig_h":"xxx.xxx.xxx.xx","id.orig_p":52441,"id.resp_h":"xxx.xxx.xxx.xxx","id.resp_p":80,"trans_depth":2,"method":"GET","host":"cdn2.test.com","uri":"/images/images_45/s4/5811445_s21.jpg?v=1","referrer":"http://www.test.com/","version":"1.1","user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0","request_body_len":0,"response_body_len":14450,"status_code":200,"status_msg":"OK","tags":[],"resp_fuids":["FaZT3o1MBcRyFO7Pi3"],"resp_mime_types":["image/jpeg"]}
{"ts":"2017-03-23T09:07:10.650839Z","uid":"C7nYia4yiQcO5WVAwg","id.orig_h":"xxx.xxx.xxx.xx","id.orig_p":52442,"id.resp_h":"xxx.xxx.xxx.xxx","id.resp_p":80,"trans_depth":1,"method":"GET","host":"cdn2.test.com","uri":"/images/default/fixture-lg.png","referrer":"http://www.test.com/","version":"1.1","user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0","request_body_len":0,"response_body_len":3533,"status_code":200,"status_msg":"OK","tags":[],"resp_fuids":["F4bKlC4HWJ0augTjAj"],"resp_mime_types":["image/png"]}

I'm using the following config file. I also tried it without the codec option, no success either. Is there a way that this config file automatically detects the csv columns in the json files, like above? Instead of naming the specific columns?

input {
  file {
    path => "/home/user/bro-logs/testdata-01-json/http.log" 
    start_position => "beginning"
    codec => "json"
  }
}

filter {
  csv {
    columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","trans_depth","method","host","uri","referrer","version","user_agent","request_body_len","response_body_len","status_code","status_msg","info_code","info_msg","tags","username","password","proxied","orig_fuids","orig_filenames","orig_mime_types","resp_fuids","resp_filenames","resp_mime_types"]
  }
}

output {
  elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "bro-http-%{+YYYY.MM.dd HH:mm:ss}"
  }
  stdout { codec => rubydebug }
}

I want to output the data to elasticsearch and create a new index every time I run the above config.

When I run the config it doesn't do anything. I just see a blinking cursor after the output shown below:

user@ubuntu:~$ sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/bro-http.conf --config.reload.automatic
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties

The logstash-plain.log file doesn't show any errors and I think it looks good. Even with --debug I can't see any FATAL errors. It keeps repeating with 'pushing flush onto pipeline'.

[2017-04-01T09:59:15,148][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-04-01T09:59:15,153][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-04-01T09:59:15,294][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x739b533e URL:http://localhost:9200/>}
[2017-04-01T09:59:15,295][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-04-01T09:59:15,393][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword"}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-04-01T09:59:15,397][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<URI::Generic:0x34da530b URL://localhost>]}
[2017-04-01T09:59:15,400][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-04-01T09:59:15,631][INFO ][logstash.pipeline        ] Pipeline main started
[2017-04-01T09:59:15,788][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9614}

I've been reading through the documentation, but I can't figure out what I'm doing wrong. Is my config file wrong, do I need to activate or install something else, do I need to make changes to ELK yml settings files?

Any help is much appreciated, thanks!

So after a reboot things started working. I'm sorry for my newbie questions, I'm just starting to learn about the Elastic-stack.

  1. Can someone explain why loading a config file and putting data in elasticsearch didn't work before? I issued the following command at some point (but I also ran the command multiple times without the last --config.xx parameter at the end):

sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/test.conf --config.reload.automatic

I guess this starts working after a reboot or by stopping and starting logstash? But now it starts creating a new index every time I save the config file with modifications. For testing purposes it is okay, but in the end I only want to run logstash manually when there are new pcap files to be processed. I guess I need to start logstash for the processing part and stop it manually when logstash is ready.

I thought that my config file would create one index after being executed. When I currently start logstash it automatically creates an index, but sometimes more then one, a second later.

  1. How can I prevent logstash for creating multiple indexes for the same log file?

    user@ubuntu:~$ curl -XGET 'localhost:9200/_cat/indices?v&pretty'
    health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
    yellow open bro-http-2017.04.01-10:59:50 3ihFRzAdQ5i4_YfFZRjGLg 5 1 280 0 341.3kb 341.3kb
    yellow open bro-http-2017.04.01-10:06:27 LDo4ioreRWan6UUxjagU0A 5 1 129 0 249.9kb 249.9kb
    yellow open bro-http-2017.04.01-11:02:00 lKfR2KRPSuOGjVp612urLA 5 1 280 0 388.8kb 388.8kb
    yellow open bro-http-2017.04.01-11:02:12 zpSdKZyhT-ecz3_9PIVraQ 5 1 280 0 353.2kb 353.2kb
    yellow open .kibana X76VRHl_SYKXTPftn3_KbA 1 1 4 0 27.6kb 27.6kb
    yellow open bro-http-2017.04.01-12:50:12 6KHlQFrVQDixYo8sXyKTKQ 5 1 280 0 67.9kb 67.9kb
    yellow open bro-http-2017.04.01-10:06:25 sdsR0jpDQ9CgDa-pwM-Yog 5 1 75 0 140kb 140kb
    yellow open bro-http-2017.04.01-10:06:26 mC71N7YPSgWMM1aYktdJPQ 5 1 48 0 129.6kb 129.6kb
    yellow open bro-http-2017.04.01-10:06:28 3uSo6Dj2QKy9ibYIBLjc_A 5 1 28 0 102.8kb 102.8kb

  2. Is there also a way I can change the above timestamps to my actual timezone? Now they are running 2 hours behind.

  3. Is it possible for a filter to automatically detect columns within the input json file, so that I can write one config for multiple bro log files? Or do I really need to specify all columns in the filter field?

The json codec should parse the event as it is in JSON format, so you should be able to remove the csv filter.

It is common to have filters per day, week or even month, but you have created an index per second, which will soon blow up. Change this line to index => "bro-http-%{+YYYY.MM.dd}" to get a daily index.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.