configuration file : filebeat.yml
filebeat.prospectors: - input_type: log paths: - "/usr/local/bro/logs/current/conn.log" fields: type: "bro-conn" fields_under_root: true #----------------------------- Logstash output -------------------------------- output.logstash: # The Logstash hosts hosts: ["logstash:5044"]
for the bro-conn-log.conf
> > input { > beats { > port => 5044 > columns => "localhost" > } > } > > filter { > > #Let's get rid of those header lines; they begin with a hash > if [message] =~ /^#/ { > drop { } > } > > #Now, using the csv filter, we can define the Bro log fields > if [type] == "bro-conn_log" { > csv { > columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","proto","service","duration","orig_bytes","resp_bytes","conn_state","local_orig","missed_bytes","history","orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes","tunnel_parents"] > > #If you use a custom delimiter, change the following value in between the quotes to your delimiter. Otherwise, insert a literal <tab> in between the two quotes on your logstash system, use a text editor like nano that doesn't convert tabs to spaces. > separator => " " > } > > #Let's convert our timestamp into the 'ts' field, so we can use Kibana features natively > date { > match => [ "ts", "UNIX" ] > > } > > # add geoip attributes > geoip { > source => "id.orig_h" > target => "orig_geoip" > } > geoip { > source => "id.resp_h" > target => "resp_geoip" > } > > #The following makes use of the translate filter (logstash contrib) to convert conn_state into human text. Saves having to look up values for packet introspection > translate { > field => "conn_state" > > destination => "conn_state_full" > > dictionary => [ > "S0", "Connection attempt seen, no reply", > "S1", "Connection established, not terminated", > "S2", "Connection established and close attempt by originator seen (but no reply from responder)", > "S3", "Connection established and close attempt by responder seen (but no reply from originator)", > "SF", "Normal SYN/FIN completion", > "REJ", "Connection attempt rejected", > "RSTO", "Connection established, originator aborted (sent a RST)", > "RSTR", "Established, responder aborted", > "RSTOS0", "Originator sent a SYN followed by a RST, we never saw a SYN-ACK from the responder", > "RSTRH", "Responder sent a SYN ACK followed by a RST, we never saw a SYN from the (purported) originator", > "SH", "Originator sent a SYN followed by a FIN, we never saw a SYN ACK from the responder (hence the connection was 'half' open)", > "SHR", "Responder sent a SYN ACK followed by a FIN, we never saw a SYN from the originator", > "OTH", "No SYN seen, just midstream traffic (a 'partial connection' that was not later closed)" > ] > } > > mutate { > convert => [ "id.orig_p", "integer" ] > convert => [ "id.resp_p", "integer" ] > convert => [ "orig_bytes", "integer" ] > convert => [ "duration", "float" ] > convert => [ "resp_bytes", "integer" ] > convert => [ "missed_bytes", "integer" ] > convert => [ "orig_pkts", "integer" ] > convert => [ "orig_ip_bytes", "integer" ] > convert => [ "resp_pkts", "integer" ] > convert => [ "resp_ip_bytes", "integer" ] > rename => [ "id.orig_h", "id_orig_host" ] > rename => [ "id.orig_p", "id_orig_port" ] > rename => [ "id.resp_h", "id_resp_host" ] > rename => [ "id.resp_p", "id_resp_port" ] > } > } > } > > output { > # stdout { codec => rubydebug } > elasticsearch { > hosts => ["localhost:9200"] > } > }