Cannot create pipeline for send bro logs to elk

configuration file : filebeat.yml

filebeat.prospectors:
- input_type: log
  paths: 
    - "/usr/local/bro/logs/current/conn.log"
  fields:
    type: "bro-conn"
  fields_under_root: true

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["logstash:5044"]

for the bro-conn-log.conf

> 
> input {
>   beats {
>     port => 5044
>     columns => "localhost"
>   }
> }
> 
> filter {
> 
>   #Let's get rid of those header lines; they begin with a hash
>   if [message] =~ /^#/ {
>     drop { }
>   }
> 
>   #Now, using the csv filter, we can define the Bro log fields
>   if [type] == "bro-conn_log" {
>     csv {
>       columns => ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","proto","service","duration","orig_bytes","resp_bytes","conn_state","local_orig","missed_bytes","history","orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes","tunnel_parents"]
> 
>       #If you use a custom delimiter, change the following value in between the quotes to your delimiter. Otherwise, insert a literal <tab> in between the two quotes on your logstash system, use a text editor like nano that doesn't convert tabs to spaces.
>       separator => "	"
>     }
> 
>     #Let's convert our timestamp into the 'ts' field, so we can use Kibana features natively
>     date {
>       match => [ "ts", "UNIX" ]
> 
>     }
> 
>     # add geoip attributes
>     geoip {
>       source => "id.orig_h"
>       target => "orig_geoip"
>     }
>     geoip {
>       source => "id.resp_h"
>       target => "resp_geoip"
>     }
> 
>     #The following makes use of the translate filter (logstash contrib) to convert conn_state into human text. Saves having to look up values for packet introspection
>     translate {
>       field => "conn_state"
> 
>       destination => "conn_state_full"
> 
>       dictionary => [
>                     "S0", "Connection attempt seen, no reply",
>                     "S1", "Connection established, not terminated",
>                     "S2", "Connection established and close attempt by originator seen (but no reply from responder)",
>                     "S3", "Connection established and close attempt by responder seen (but no reply from originator)",
>                     "SF", "Normal SYN/FIN completion",
>                     "REJ", "Connection attempt rejected",
>                     "RSTO", "Connection established, originator aborted (sent a RST)",
>                     "RSTR", "Established, responder aborted",
>                     "RSTOS0", "Originator sent a SYN followed by a RST, we never saw a SYN-ACK from the responder",
>                     "RSTRH", "Responder sent a SYN ACK followed by a RST, we never saw a SYN from the (purported) originator",
>                     "SH", "Originator sent a SYN followed by a FIN, we never saw a SYN ACK from the responder (hence the connection was 'half' open)",
> 		                "SHR", "Responder sent a SYN ACK followed by a FIN, we never saw a SYN from the originator",
>                     "OTH", "No SYN seen, just midstream traffic (a 'partial connection' that was not later closed)"
>                     ]
>     }
> 
>     mutate {
>       convert => [ "id.orig_p", "integer" ]
>       convert => [ "id.resp_p", "integer" ]
>       convert => [ "orig_bytes", "integer" ]
>       convert => [ "duration", "float" ]
>       convert => [ "resp_bytes", "integer" ]
>       convert => [ "missed_bytes", "integer" ]
>       convert => [ "orig_pkts", "integer" ]
>       convert => [ "orig_ip_bytes", "integer" ]
>       convert => [ "resp_pkts", "integer" ]
>       convert => [ "resp_ip_bytes", "integer" ]
>       rename =>  [ "id.orig_h", "id_orig_host" ]
>       rename =>  [ "id.orig_p", "id_orig_port" ]
>       rename =>  [ "id.resp_h", "id_resp_host" ]
>       rename =>  [ "id.resp_p", "id_resp_port" ]
>     }
>   }
> }
> 
> output {
>   # stdout { codec => rubydebug }
>   elasticsearch { 
>     hosts => ["localhost:9200"]
>  }
> }