I am using the JSON plugin to parse bro logs I have being printed in JSON format. My current setup is:
- distributed sensor collecting data recording it in logs in JSON format.
- logstash picks up the logs with the following input:
input { file { path => "/nsm/bro/logs/current/*.log" exclude => [ "/nsm/bro/logs/current/stderr.log", "/nsm/bro/logs/current/stdout.log", "/nsm/bro/logs/current/communication.log", "/nsm/bro/logs/current/loaded_scripts.log" ] codec => "json" start_position => "beginning" sincedb_path => "/dev/null" type => "BRO" } }
Filter stuff
output { if [type] == "BRO" { redis { host => "X.X.X.X" data_type => "list" key => "bro" } } }
- events get sent to central Redis Queue and forwarded with the following:
input { redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro" add_field => { "[@metadata][stage]" => "bro_redis" } } redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro" add_field => { "[@metadata][stage]" => "bro_redis" } } } output { if [@metadata][stage] == "bro_redis" { redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro_redis" } } }
- Data is read from redis with the following on a number of Logstash Indexer nodes and output to the ES cluster:
input { redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro_redis" } redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro_redis" } redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro_redis" } redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro_redis" } redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro_redis" } redis { host => [ "X.X.X.X ] data_type => "list" key => "bro_redis" } redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro_redis" } redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro_redis" } redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro_redis" } redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro_redis" } redis { host => [ "X.X.X.X" ] data_type => "list" key => "bro_redis" } } Custom filtering to add fields and some GEOIP stuff output { if [type] == "BRO" { if [sensor] == "SENSOR1" { elasticsearch { hosts => [ list of elasticsearch nodes ] manage_template => false index => "sensor1-bro-%{+YYYY.MM.dd}" } } if [sensor] == "SENSOR2" { elasticsearch { hosts => [ list of elasticsearch nodes] manage_template => false index => "sensor2-bro-%{+YYYY.MM.dd}" } } } }
I am getting some strange errors in the /var/log/logstash/logstash.log file.
-
:message=>"IP Field contained invalid IP address or hostname - I get this when the IP address has a "-" in the field.
-
:message=>"Trouble parsing csv
:exception=>#<CSV::MalformedCSVError: Illegal quoting in line 1.>, :level=>:warnq
I get this with a lot of records. My problem is the data is not a CSV as indicated by the error. As you can see from the conf files above the data is in JSON. Why would logstash think its CSV. On my indexer nodes (Step 4 above) do I need to tell redis that the data is in JSON format. I thought according to the DOCS that the redis plugin reads json by default?
I know this is a very long post but any help would be greatly appreciated.
Thanks