Cannot create pipeline for send bro logs to elk


(Bz Os) #1

i attempt to send logs from bro ids to elk i am using filebeat.yml for input configuration when i specifie the path of the logs and the bro-con_log.conf for the configuration of logstash

so when i open /var/log/logstash/logstash-plaint.log i have as output :

logstash.inputs.file missing a requred setting for the file nput plugin : 
  input {
{file {path => # Setting Missing 
          ...
 }
}
logstash.aggent Cnnot create pipeline {:reason => something is wtong with your configuration.'}

(Ry Biesemeyer) #2

What does your pipeline configuration look like, specifically the file section in your pipeline's inputs? The above error indicates that the path directive is required, but not specified.


(Bz Os) #3

when can i found the pipeline configuration


(Ry Biesemeyer) #4

your pipeline configuration is where you define your pipeline's inputs, outputs, and filters; typically people use the .conf extension for their pipeline configurations, so maybe it is your bro-con_log.conf?


(Bz Os) #5

configuration file : filebeat.yml

filebeat.prospectors:
- input_type: log
  paths: 
    - "/usr/local/bro/logs/current/conn.log"
  fields:
    type: "bro-conn"
  fields_under_root: true

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["logstash:5044"]

for the bro-conn-log.conf

> 
> input {
>   beats {
>     port => 5044
>     columns => "localhost"
>   }
> }
> 
> filter {
> 
>   #Let's get rid of those header lines; they begin with a hash
>   if [message] =~ /^#/ {
>     drop { }
>   }
> 
>   #Now, using the csv filter, we can define the Bro log fields
>   if [type] == "bro-conn_log" {
>     csv {
>       columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","proto","service","duration","orig_bytes","resp_bytes","conn_state","local_orig","missed_bytes","history","orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes","tunnel_parents"]
> 
>       #If you use a custom delimiter, change the following value in between the quotes to your delimiter. Otherwise, insert a literal <tab> in between the two quotes on your logstash system, use a text editor like nano that doesn't convert tabs to spaces.
>       separator => "	"
>     }
> 
>     #Let's convert our timestamp into the 'ts' field, so we can use Kibana features natively
>     date {
>       match => [ "ts", "UNIX" ]
> 
>     }
> 
>     # add geoip attributes
>     geoip {
>       source => "id.orig_h"
>       target => "orig_geoip"
>     }
>     geoip {
>       source => "id.resp_h"
>       target => "resp_geoip"
>     }
> 
>     #The following makes use of the translate filter (logstash contrib) to convert conn_state into human text. Saves having to look up values for packet introspection
>     translate {
>       field => "conn_state"
> 
>       destination => "conn_state_full"
> 
>       dictionary => [
>                     "S0", "Connection attempt seen, no reply",
>                     "S1", "Connection established, not terminated",
>                     "S2", "Connection established and close attempt by originator seen (but no reply from responder)",
>                     "S3", "Connection established and close attempt by responder seen (but no reply from originator)",
>                     "SF", "Normal SYN/FIN completion",
>                     "REJ", "Connection attempt rejected",
>                     "RSTO", "Connection established, originator aborted (sent a RST)",
>                     "RSTR", "Established, responder aborted",
>                     "RSTOS0", "Originator sent a SYN followed by a RST, we never saw a SYN-ACK from the responder",
>                     "RSTRH", "Responder sent a SYN ACK followed by a RST, we never saw a SYN from the (purported) originator",
>                     "SH", "Originator sent a SYN followed by a FIN, we never saw a SYN ACK from the responder (hence the connection was 'half' open)",
> 		                "SHR", "Responder sent a SYN ACK followed by a FIN, we never saw a SYN from the originator",
>                     "OTH", "No SYN seen, just midstream traffic (a 'partial connection' that was not later closed)"
>                     ]
>     }
> 
>     mutate {
>       convert => [ "id.orig_p", "integer" ]
>       convert => [ "id.resp_p", "integer" ]
>       convert => [ "orig_bytes", "integer" ]
>       convert => [ "duration", "float" ]
>       convert => [ "resp_bytes", "integer" ]
>       convert => [ "missed_bytes", "integer" ]
>       convert => [ "orig_pkts", "integer" ]
>       convert => [ "orig_ip_bytes", "integer" ]
>       convert => [ "resp_pkts", "integer" ]
>       convert => [ "resp_ip_bytes", "integer" ]
>       rename =>  [ "id.orig_h", "id_orig_host" ]
>       rename =>  [ "id.orig_p", "id_orig_port" ]
>       rename =>  [ "id.resp_h", "id_resp_host" ]
>       rename =>  [ "id.resp_p", "id_resp_port" ]
>     }
>   }
> }
> 
> output {
>   # stdout { codec => rubydebug }
>   elasticsearch { 
>     hosts => ["localhost:9200"]
>  }
> }

(Ry Biesemeyer) #6

do you have any other pipeline configs being loaded by Logstash? this one doesn't have a file input plugin, and the error message is from something with a file input plugin.


(Bz Os) #7

yes i have bro-conn_log.conf:

input {
file {
type => "bro-conn_log"
start_position => "end"
sincedb_path => "/var/tmp/.bro_conn_sincedb"

#Edit the following path to reflect the location of your log files. You can also change the extension if you use something else
path => "/nsm/bro/logs/current/conn.log"

}
}

filter {

#Let's get rid of those header lines; they begin with a hash
if [message] =~ /^#/ {
drop { }
}

#Now, using the csv filter, we can define the Bro log fields
if [type] == "bro-conn_log" {
csv {
columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","proto","service","duration","orig_bytes","resp_bytes","conn_state","local_orig","missed_bytes","history","orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes","tunnel_parents"]

  #If you use a custom delimiter, change the following value in between the quotes to your delimiter. Otherwise, insert a literal <tab> in between the two quotes on your logstash system, use a text editor like nano that doesn't convert tabs to spaces.
  separator => "	"
}

#Let's convert our timestamp into the 'ts' field, so we can use Kibana features natively
date {
  match => [ "ts", "UNIX" ]
}

# add geoip attributes
geoip {
  source => "id.orig_h"
  target => "orig_geoip"
}
geoip {
  source => "id.resp_h"
  target => "resp_geoip"
}

#The following makes use of the translate filter (logstash contrib) to convert conn_state into human text. Saves having to look up values for packet introspection
translate {
  field => "conn_state"

  destination => "conn_state_full"

  dictionary => [
                "S0", "Connection attempt seen, no reply",
                "S1", "Connection established, not terminated",
                "S2", "Connection established and close attempt by originator seen (but no reply from responder)",
                "S3", "Connection established and close attempt by responder seen (but no reply from originator)",
                "SF", "Normal SYN/FIN completion",
                "REJ", "Connection attempt rejected",
                "RSTO", "Connection established, originator aborted (sent a RST)",
                "RSTR", "Established, responder aborted",
                "RSTOS0", "Originator sent a SYN followed by a RST, we never saw a SYN-ACK from the responder",
                "RSTRH", "Responder sent a SYN ACK followed by a RST, we never saw a SYN from the (purported) originator",
                "SH", "Originator sent a SYN followed by a FIN, we never saw a SYN ACK from the responder (hence the connection was 'half' open)",
                  "SHR", "Responder sent a SYN ACK followed by a FIN, we never saw a SYN from the originator",
                "OTH", "No SYN seen, just midstream traffic (a 'partial connection' that was not later closed)"
                ]
}

mutate {
  convert => [ "id.orig_p", "integer" ]
  convert => [ "id.resp_p", "integer" ]
  convert => [ "orig_bytes", "integer" ]
  convert => [ "duration", "float" ]
  convert => [ "resp_bytes", "integer" ]
  convert => [ "missed_bytes", "integer" ]
  convert => [ "orig_pkts", "integer" ]
  convert => [ "orig_ip_bytes", "integer" ]
  convert => [ "resp_pkts", "integer" ]
  convert => [ "resp_ip_bytes", "integer" ]
  rename =>  [ "id.orig_h", "id_orig_host" ]
  rename =>  [ "id.orig_p", "id_orig_port" ]
  rename =>  [ "id.resp_h", "id_resp_host" ]
  rename =>  [ "id.resp_p", "id_resp_port" ]
}

}
}

output {

stdout { codec => rubydebug }

elasticsearch { hosts => ["localhost:9200"] }
}


(Ry Biesemeyer) #8

I can't quite tell because your formatting is getting mangled a bit, but it looks like you may have commented out the path directive (on these Discuss forums, wrapping text in code fences like ~~~ on a line before and a line after will signal to the discuss markdown processor that the bits in-between shouldn't include any additional formatting).

Do you have something like the following?

input {
  file {
    type => "bro-conn_log"
    start_position => "end"
    sincedb_path => "/var/tmp/.bro_conn_sincedb"
    #Edit the following path to reflect the location of your log files. You can also change the extension if you use something else
    path => "/nsm/bro/logs/current/conn.log"
  }
}

(Bz Os) #9

yes i have the same input that you posted


(Ry Biesemeyer) #10

do you have any other pipeline configurations that have a file input?


(Bz Os) #11

thanks yaauie the problem is fixed i was missing he path in the input file


(system) #12

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.