Kafka error

filter {
  if [type] == "BRO" {
    mutate {
      rename => { "_path" => "log_type" }
    }
    date {
      match => ["ts", "UNIX_MS"]
    }
    if [log_type] == "conn" {
      translate {
        field => "conn_state"
        destination => "conn_state_full"
        dictionary => [
          "S0", "Connection attempt seen, no reply",
          "S1", "Connection established, not terminated",
          "S2", "Connection established and close attempt by originator seen (but no reply from responder)",
          "S3", "Connection established and close attempt by responder seen (but no reply from originator)",
          "SF", "Normal SYN/FIN completion",
          "REJ", "Connection attempt rejected",
          "RSTO", "Connection established, originator aborted (sent a RST)",
          "RSTR", "Established, responder aborted",
          "RSTOS0", "Originator sent a SYN followed by a RST, we never saw a SYN-ACK from the responder",
          "RSTRH", "Responder sent a SYN ACK followed by a RST, we never saw a SYN from the (purported) originator",
          "SH", "Originator sent a SYN followed by a FIN, we never saw a SYN ACK from the responder (hence the connection was 'half' open)",
          "SHR", "Responder sent a SYN ACK followed by a FIN, we never saw a SYN from the originator",
          "OTH", "No SYN seen, just midstream traffic (a 'partial connection' that was not later closed)"
        ]
      }
      mutate {
        convert => { "src_ip" => "integer" }
        convert => { "src_port" => "integer" }
        convert => { "src_bytes" => "integer" }
        convert => { "resp_bytes" => "integer" }
        convert => { "missed_bytes" => "integer" }
        convert => { "src_pkts" => "integer" }
        convert => { "orig_ip_bytes" => "integer" }
        convert => { "resp_pkts" => "integer" }
        convert => { "resp_ip_bytes" => "integer" }
      }
    }
    mutate {
      rename => { "id.orig_h" => "src_ip" }
      rename => { "id.orig_p" => "src_port" }
      rename => { "id.resp_h" => "resp_ip" }
      rename => { "id.resp_p" => "resp_port" }
      rename => { "orig_bytes" => "src_bytes" }
      rename => { "orig_pkts" => "src_packets" }
      rename => { "orig_cc" => "src_cc" }
    }
    if [log_type] == "conn" {
      geoip {
        source => "src_ip"
        target => "geoip_src"
      }
      geoip {
        source => "resp_ip"
        target => "geoip_resp"
      }
    }
    else if [log_type] == "notice" {
      geoip {
        source => "src"
        target => "geoip_src"
      }
      geoip {
        source => "dst"
        target => "geoip_resp"
      }
    }
    if [log_type] == "ssl" {
      mutate {
        rename => { "version" => "ssl_version" }
      }
    }
    if [log_type] == "ssh" {
      mutate {
        rename => { "version" => "ssh_version" }
      }
    }
    if [log_type] in ["conn","notice","intel","dns","http_eth1","http_eth2","http_eth3","http_eth4","http_eth5","weird","ssl","ssh","syslog"] {
      if [src_ip] !~"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)|(^224\.0\.0\.)" {  ##Disable Private IP space from Lookup Source###
        mutate {
          add_field => { "src_Senderbase_lookup" => "http://www.senderbase.org/lookup/?search_string=%{src_ip}" }
          add_field => { "src_CBL_lookup" => "http://cbl.abuseat.org/lookup.cgi?ip=%{src_ip}" }
          add_field => { "src_Spamhaus_lookup" => "http://www.spamhaus.org/query/bl?ip=%{src_ip}" }
          add_field => { "src_DomainTools_lookup" => "http://whois.domaintools.com/%{src_ip}" }
        }
      }
      if [resp_ip] !~"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)|(^224\.0\.0\.)" {  ##Disable Private IP space from Lookup Source##
        mutate {
          add_field => { "resp_Senderbase_lookup" => "http://www.senderbase.org/lookup/?search_string=%{resp_ip}" }
          add_field => { "resp_CBL_lookup" => "http://cbl.abuseat.org/lookup.cgi?ip=%{resp_ip}" }
          add_field => { "resp_Spamhaus_lookup" => "http://www.spamhaus.org/query/bl?ip=%{resp_ip}" }
          add_field => { "resp_DomainTools_lookup" => "http://whois.domaintools.com/%{resp_ip}" }
        }
      }
    }
    if [log_type] in ["http_eth1","http_eth2","http_eth3","http_eth4","http_eth5"] {
      mutate {
        add_field => {"url_full" => "%{host}%{uri}"}
      }
    }
  }
}