[Solved] Can't get logstash to catch cowrie filebeat

Can't get it working with cowrie->filebeat->logstash->elasticsearch->kibana

My cowrie logs to file, which is read by filebeat.

Filebeat version

$ /usr/share/filebeat/bin/filebeat version
filebeat version 6.1.1 (arm), libbeat 6.1.1

Filebeat config

$ sudo cat /etc/filebeat/filebeat.yml
filebeat.modules:

filebeat.prospectors:
- input_type: log
  type: log
  enabled: true
  paths:
    - /home/cowrie/cowrie/var/log/cowrie/cowrie.json*
  encoding: plain
  fields:
    document_type: cowrie

registry_file: /var/lib/filebeat/registry

output.logstash:
  hosts: ["192.168.10.6:5044"]

shipper:

logging:
  to_syslog: false
  to_files: true
  files:
    path: /var/log/filebeat/
    name: mybeat
    rotateeverybytes: 10485760 # = 10MB
    keepfiles: 7
  level: info

Logstash version

# /opt/logstash/bin/logstash --version
logstash 6.4.3

Logstash config

# cat /etc/logstash/conf.d/cowrie.conf
input {
beats {
type => "beats"
port => 5044    # Pick an available port to listen on
host => "0.0.0.0"
}
}

filter {
if [type] == "cowrie" {

json {
source => message
}

date {
match => [ "timestamp", "ISO8601" ]
}

if [src_ip]  {
dns {
reverse => [ "src_host", "src_ip" ]
action => "append"
 }
geoip {
source => "src_ip"  # With the src_ip field
target => "geoip"   # Add the geoip one
# Using the database we previously saved
database => "/opt/logstash/vendor/geoip/GeoLite2-City.mmdb"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
}
# Get the ASN code as well
#geoip {
#source => "src_ip"
#database => "/opt/logstash/vendor/geoip/GeoIPASNum.dat"
#}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
}
}
}

output {
if [type] == "cowrie" {
# Output to elasticsearch
elasticsearch {
hosts => ["127.0.0.1:9200"]  # Provided elasticsearch is listening on that host:port
#sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
file {
path => "/tmp/cowrie-logstash.log"
codec => json
}
# For debugging
stdout {
codec => rubydebug
}
}
}

Logstash seems to be picking up something

Logstash logfile excerpt

...
[DEBUG] 2019-01-26 21:54:14.493 [Ruby-0-Thread-8: :1] pipeline - filter received {"event"=>{"host"=>"hunnipi", "source"=>"/home/cowrie/cowrie/var/log/cowrie/cowrie.json", "@version"=>"1", "message"=>"{\"eventid\": \"cowrie.direct-tcpip.request\", \"timestamp\": \"2019-01-26T20:54:12.854963Z\", \"dst_ip\": \"ya.ru\", \"src_ip\": \"5.188.86.208\", \"session\": \"72993b90b7b8\", \"dst_port\": 443, \"src_port\": 0, \"message\": \"direct-tcp connection request to ya.ru:443 from ::1:0\", \"sensor\": \"hunnipi\"}", "tags"=>["beats_input_codec_plain_applied"], "type"=>"beats", "beat"=>{"version"=>"6.1.1", "name"=>"hunnipi", "hostname"=>"hunnipi"}, "@timestamp"=>2019-01-26T20:54:13.347Z, "offset"=>53203349, "prospector"=>{"type"=>"log"}, "fields"=>{"document_type"=>"cowrie"}}}
[DEBUG] 2019-01-26 21:54:14.494 [Ruby-0-Thread-8: :1] pipeline - output received {"event"=>{"host"=>"hunnipi", "source"=>"/home/cowrie/cowrie/var/log/cowrie/cowrie.json", "@version"=>"1", "message"=>"{\"eventid\": \"cowrie.direct-tcpip.request\", \"timestamp\": \"2019-01-26T20:54:12.854963Z\", \"dst_ip\": \"ya.ru\", \"src_ip\": \"5.188.86.208\", \"session\": \"72993b90b7b8\", \"dst_port\": 443, \"src_port\": 0, \"message\": \"direct-tcp connection request to ya.ru:443 from ::1:0\", \"sensor\": \"hunnipi\"}", "tags"=>["beats_input_codec_plain_applied"], "type"=>"beats", "beat"=>{"version"=>"6.1.1", "name"=>"hunnipi", "hostname"=>"hunnipi"}, "@timestamp"=>2019-01-26T20:54:13.347Z, "offset"=>53203349, "prospector"=>{"type"=>"log"}, "fields"=>{"document_type"=>"cowrie"}}}
...

No matching index found in elasticsearch

# curl 'http://localhost:9200/_cat/indices?v'
health status index               uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   logstash-2019.01.26 HZ9AdcbSSM69MQw-4xXDhA   5   1        241            0    232.7kb        232.7kb
green  open   .kibana             _GwWwwVuS_O-wA3mEkPenw   1   0          2            0     10.7kb         10.7kb

Thanks
/jon

Your messages have the type field set to beats, so none of the conditionals match.

Thanks for the reply

Edited logstash config
Still no luck

# cat /etc/logstash/conf.d/cowrie.conf
input {
  beats {
    port => 5044    # Pick an available port to listen on
    host => "0.0.0.0"
  }
} 
 
filter {
  if [type] == "cowrie" {
    json {
      source => message
    }
    date {
      match => [ "timestamp", "ISO8601" ]
    }
    if [src_ip]  {
      dns {
        reverse => [ "src_host", "src_ip" ]
        action => "append"
      }
      geoip {
        source => "src_ip"  # With the src_ip field
        target => "geoip"   # Add the geoip one
        # Using the database we previously saved
        database => "/opt/logstash/vendor/geoip/GeoLite2-City.mmdb"
        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
      }
      # Get the ASN code as well
      #geoip {
        #source => "src_ip"
        #database => "/opt/logstash/vendor/geoip/GeoIPASNum.dat"
      #}
      mutate {
        convert => [ "[geoip][coordinates]", "float" ]
      }
    }
  }
}

output {
  if [type] == "cowrie" {
    # Output to elasticsearch
    elasticsearch {
      hosts => ["127.0.0.1:9200"]  # Provided elasticsearch is listening on that host:port
      #sniffing => true
      manage_template => false
      index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
      document_type => "%{[@metadata][type]}"
    }
    file {
      path => "/tmp/cowrie-logstash.log"
      codec => json
    }
    # For debugging
    stdout {
      codec => rubydebug
    }
  }
}

Still, a lot of output in the log from the filebeat

...
[DEBUG] 2019-01-26 22:38:40.673 [Ruby-0-Thread-6: :1] file - Starting flush cycle
[DEBUG] 2019-01-26 22:38:41.315 [pool-2-thread-2] cgroup - One or more required cgroup files or directories not found: /proc/self/cgroup, /sys/fs/cgroup/cpuacct, /sys/fs/cgroup/cpu
[DEBUG] 2019-01-26 22:38:41.683 [nioEventLoopGroup-3-3] ConnectionHandler - e39a15a6: batches pending: true
[DEBUG] 2019-01-26 22:38:41.683 [defaultEventExecutorGroup-5-2] BeatsHandler - [local: 192.168.10.6:5044, remote: 192.168.10.48:35816] Received a new payload
[DEBUG] 2019-01-26 22:38:41.684 [defaultEventExecutorGroup-5-2] BeatsHandler - [local: 192.168.10.6:5044, remote: 192.168.10.48:35816] Sending a new message for the listener, sequence: 1
[DEBUG] 2019-01-26 22:38:41.685 [defaultEventExecutorGroup-5-2] BeatsHandler - [local: 192.168.10.6:5044, remote: 192.168.10.48:35816] Sending a new message for the listener, sequence: 2
[DEBUG] 2019-01-26 22:38:41.686 [defaultEventExecutorGroup-5-2] BeatsHandler - e39a15a6: batches pending: false
[DEBUG] 2019-01-26 22:38:41.803 [Ruby-0-Thread-8: :1] pipeline - filter received {"event"=>{"offset"=>55453829, "source"=>"/home/cowrie/cowrie/var/log/cowrie/cowrie.json", "@timestamp"=>2019-01-26T21:38:40.670Z, "@version"=>"1", "prospector"=>{"type"=>"log"}, "tags"=>["beats_input_codec_plain_applied"], "host"=>"hunnipi", "beat"=>{"hostname"=>"hunnipi", "name"=>"hunnipi", "version"=>"6.1.1"}, "message"=>"{\"eventid\": \"cowrie.direct-tcpip.request\", \"timestamp\": \"2019-01-26T21:38:38.864972Z\", \"dst_ip\": \"31.13.67.174\", \"src_ip\": \"5.188.86.208\", \"session\": \"a83e0879efdd\", \"dst_port\": 443, \"src_port\": 31373, \"message\": \"direct-tcp connection request to 31.13.67.174:443 from ::1:31373\", \"sensor\": \"hunnipi\"}"}}
[DEBUG] 2019-01-26 22:38:41.803 [Ruby-0-Thread-7: :1] pipeline - filter received {"event"=>{"offset"=>55456652, "source"=>"/home/cowrie/cowrie/var/log/cowrie/cowrie.json", "@timestamp"=>2019-01-26T21:38:40.670Z, "@version"=>"1", "prospector"=>{"type"=>"log"}, "tags"=>["beats_input_codec_plain_applied"], "host"=>"hunnipi", "beat"=>{"hostname"=>"hunnipi", "name"=>"hunnipi", "version"=>"6.1.1"}, "message"=>"{\"eventid\": \"cowrie.direct-tcpip.data\", \"timestamp\": \"2019-01-26T21:38:39.015163Z\", \"sensor\": \"hunnipi\", \"src_ip\": \"5.188.86.208\", \"session\": \"a83e0879efdd\", \"dst_port\": 443, \"dst_ip\": \"31.13.67.174\", \"data\": 
...
[DEBUG] 2019-01-26 22:38:41.804 [Ruby-0-Thread-8: :1] pipeline - output received {"event"=>{"offset"=>55453829, "source"=>"/home/cowrie/cowrie/var/log/cowrie/cowrie.json", "@timestamp"=>2019-01-26T21:38:40.670Z, "@version"=>"1", "prospector"=>{"type"=>"log"}, "tags"=>["beats_input_codec_plain_applied"], "host"=>"hunnipi", "beat"=>{"hostname"=>"hunnipi", "name"=>"hunnipi", "version"=>"6.1.1"}, "message"=>"{\"eventid\": \"cowrie.direct-tcpip.request\", \"timestamp\": \"2019-01-26T21:38:38.864972Z\", \"dst_ip\": \"31.13.67.174\", \"src_ip\": \"5.188.86.208\", \"session\": \"a83e0879efdd\", \"dst_port\": 443, \"src_port\": 31373, \"message\": \"direct-tcp connection request to 31.13.67.174:443 from ::1:31373\", \"sensor\": \"hunnipi\"}"}}
...
[DEBUG] 2019-01-26 22:38:42.419 [pool-2-thread-1] jvm - collector name {:name=>"ParNew"}
[DEBUG] 2019-01-26 22:38:42.419 [pool-2-thread-1] jvm - collector name {:name=>"ConcurrentMarkSweep"}
[DEBUG] 2019-01-26 22:38:42.683 [Ruby-0-Thread-6: :1] file - Starting flush cycle
...

But I can't see anything about type: cowrie?

Somwething wrong with the filebeat config?

Thanks Badger!

You put me on the right path with your reply.
Been staring at this for two days...

Finally this logstash config worked

# cat /etc/logstash/conf.d/cowrie.conf
input {
  beats {
    port => 5044    # Pick an available port to listen on
    host => "0.0.0.0"
  }
}

filter {
  if [fields][document_type] == "cowrie" {
    json {
      source => message
    }
    date {
      match => [ "timestamp", "ISO8601" ]
    }
    if [src_ip]  {
      dns {
        reverse => [ "src_host", "src_ip" ]
        action => "append"
      }
      geoip {
        source => "src_ip"  # With the src_ip field
        target => "geoip"   # Add the geoip one
        # Using the database we previously saved
        database => "/opt/logstash/vendor/geoip/GeoLite2-City.mmdb"
        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
      }
      # Get the ASN code as well
      #geoip {
        #source => "src_ip"
        #database => "/opt/logstash/vendor/geoip/GeoIPASNum.dat"
      #}
      mutate {
        convert => [ "[geoip][coordinates]", "float" ]
      }
    }
  }
}

output {
  if [fields][document_type] == "cowrie" {
    # Output to elasticsearch
    elasticsearch {
      hosts => ["127.0.0.1:9200"]  # Provided elasticsearch is listening on that host:port
      #sniffing => true
      manage_template => false
      index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
      document_type => "%{[@metadata][type]}"
    }
    file {
      path => "/tmp/cowrie-logstash.log"
      codec => json
    }
    # For debugging
    stdout {
      codec => rubydebug
    }
  }
}

It was the line

if [fields][document_type] == "cowrie"

that made it

/jon

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.