My field `input` is modified, and some events are filtered (against my will ;-))

(Edited on March 31 with example)

I have a working Filebeat+ELK config used to process logs from Cowrie (a honeypot).
Except some log entries of Cowrie do not show in the end in Kibana, as if they had been filtered (by Logstash ?).

Example: my Cowrie JSON log has entries with keyword input I am particularly interested in.

{  "eventid":"cowrie.command.input",
   "input":"cat /proc/mounts; /bin/busybox EOVZJ",
   "message":"CMD: cat /proc/mounts; /bin/busybox EOVZJ",
   "sensor":"instance-42",
   "timestamp":"2020-03-31T10:25:55.077589Z",
   "src_ip":"xxxx",
   "session":"c4b3d3dc3100"}

I see the corresponding log entry in logstash's journalctl (see below for full log):

  • eventid is present and containscowrie.command.input
  • message is present and contains the cat /proc/mounts ... commands
  • input is present contains the commands

Good. But when I search in Kibana (see below for screenshot):

  • I have a input field, but it no longer contains the command and has been overwritten with the input type which is log :frowning:
  • I don't have any event cowrie.command.inputduring that timeframe (though there is no specific search filter). :rage:

Where is the error please? Who is changing/filtering the input/message/eventid fields?
Thanks

This is Logstash journal:

Mar 31 10:26:02 instance-42 logstash[22592]: {
Mar 31 10:26:02 instance-42 logstash[22592]:        "eventid" => "cowrie.command.input",
Mar 31 10:26:02 instance-42 logstash[22592]:          "agent" => {
Mar 31 10:26:02 instance-42 logstash[22592]:             "hostname" => "instance-42",
Mar 31 10:26:02 instance-42 logstash[22592]:              "id" => "78af...",
Mar 31 10:26:02 instance-42 logstash[22592]:              "ephemeral_id" => "b2..",
Mar 31 10:26:02 instance-42 logstash[22592]:               "type" => "filebeat",
Mar 31 10:26:02 instance-42 logstash[22592]:              "version" => "7.6.1"
Mar 31 10:26:02 instance-42 logstash[22592]:          },
Mar 31 10:26:02 instance-42 logstash[22592]:          "geoip" => {
Mar 31 10:26:02 instance-42 logstash[22592]:               "timezone" => "xxxx",
Mar 31 10:26:02 instance-42 logstash[22592]:                "ip" => "xxxx",
Mar 31 10:26:02 instance-42 logstash[22592]:                "latitude" => xxx,
Mar 31 10:26:02 instance-42 logstash[22592]:                "continent_code" => "xx",
Mar 31 10:26:02 instance-42 logstash[22592]:                "city_name" => "xxxx",
...
Mar 31 10:26:02 instance-42 logstash[22592]:               "location" => {
Mar 31 10:26:02 instance-42 logstash[22592]:                     "lon" => x,
Mar 31 10:26:02 instance-42 logstash[22592]:                     "lat" => x
Mar 31 10:26:02 instance-42 logstash[22592]:                },
Mar 31 10:26:02 instance-42 logstash[22592]:              "region_name" => "xxxx",
..
Mar 31 10:26:02 instance-42 logstash[22592]:              "longitude" => xxx
Mar 31 10:26:02 instance-42 logstash[22592]:           },
Mar 31 10:26:02 instance-42 logstash[22592]:           "log" => { 
Mar 31 10:26:02 instance-42 logstash[22592]:              "file" => {}
Mar 31 10:26:02 instance-42 logstash[22592]:           },
Mar 31 10:26:02 instance-42 logstash[22592]:           "session" => "c4b3d3dc3100",
Mar 31 10:26:02 instance-42 logstash[22592]:           "@metadata" => {
Mar 31 10:26:02 instance-42 logstash[22592]:               "type" => "_doc",
Mar 31 10:26:02 instance-42 logstash[22592]:               "beat" => "filebeat",
Mar 31 10:26:02 instance-42 logstash[22592]:               "version" => "7.6.1",
Mar 31 10:26:02 instance-42 logstash[22592]:               "ip_address" => "127.0.0.1"
Mar 31 10:26:02 instance-42 logstash[22592]:            },
Mar 31 10:26:02 instance-42 logstash[22592]:           "src_host" => "xxxxxxxxxx",
Mar 31 10:26:02 instance-42 logstash[22592]:           "message" => "CMD: cat /proc/mounts; /bin/busybox EOVZJ",
Mar 31 10:26:02 instance-42 logstash[22592]:           "type" => "cowrie",
Mar 31 10:26:02 instance-42 logstash[22592]:           "tags" => [],
Mar 31 10:26:02 instance-42 logstash[22592]:           "cloud" => {
Mar 31 10:26:02 instance-42 logstash[22592]:                  "availability_zone" => "europe-west1-b",
Mar 31 10:26:02 instance-42 logstash[22592]:                  "instance" => {
Mar 31 10:26:02 instance-42 logstash[22592]:                      "name" => "instance-42",
Mar 31 10:26:02 instance-42 logstash[22592]:                      "id" => "xxxxxxxxxxx"
Mar 31 10:26:02 instance-42 logstash[22592]:                   },
Mar 31 10:26:02 instance-42 logstash[22592]:                  "provider" => "xxx",
Mar 31 10:26:02 instance-42 logstash[22592]:                  "machine" => {
Mar 31 10:26:02 instance-42 logstash[22592]:                      "type" => "xxx"
Mar 31 10:26:02 instance-42 logstash[22592]:                   },
Mar 31 10:26:02 instance-42 logstash[22592]:                  "project" => {
Mar 31 10:26:02 instance-42 logstash[22592]:                       "id" => "xxxxxxxxx"
Mar 31 10:26:02 instance-42 logstash[22592]:                   }
Mar 31 10:26:02 instance-42 logstash[22592]:            },
Mar 31 10:26:02 instance-42 logstash[22592]:           "src_ip" => "xxxxxxx",
Mar 31 10:26:02 instance-42 logstash[22592]:           "input" => "cat /proc/mounts; /bin/busybox EOVZJ",
Mar 31 10:26:02 instance-42 logstash[22592]:           "@timestamp" => 2020-03-31T10:25:55.077Z,
Mar 31 10:26:02 instance-42 logstash[22592]:           "ecs" => {
Mar 31 10:26:02 instance-42 logstash[22592]:               "version" => "1.4.0"
Mar 31 10:26:02 instance-42 logstash[22592]:            },
Mar 31 10:26:02 instance-42 logstash[22592]:           "host" => {
Mar 31 10:26:02 instance-42 logstash[22592]:              "hostname" => "instance-42",
Mar 31 10:26:02 instance-42 logstash[22592]:               "os" => {
Mar 31 10:26:02 instance-42 logstash[22592]:                      "kernel" => "4.19.0-8-cloud-amd64",
Mar 31 10:26:02 instance-42 logstash[22592]:                      "codename" => "buster",
Mar 31 10:26:02 instance-42 logstash[22592]:                      "name" => "Debian GNU/Linux",
Mar 31 10:26:02 instance-42 logstash[22592]:                      "family" => "debian",
Mar 31 10:26:02 instance-42 logstash[22592]:                      "version" => "10 (buster)",
Mar 31 10:26:02 instance-42 logstash[22592]:                      "platform" => "debian"
Mar 31 10:26:02 instance-42 logstash[22592]:                },
Mar 31 10:26:02 instance-42 logstash[22592]:            "containerized" => false,
Mar 31 10:26:02 instance-42 logstash[22592]:            "name" => "instance-42",
Mar 31 10:26:02 instance-42 logstash[22592]:           "architecture" => "x86_64"
Mar 31 10:26:02 instance-42 logstash[22592]:           },
Mar 31 10:26:02 instance-42 logstash[22592]:           "@version" => "1",
Mar 31 10:26:02 instance-42 logstash[22592]:           "sensor" => "instance-42",
Mar 31 10:26:02 instance-42 logstash[22592]:           "timestamp" => "2020-03-31T10:25:55.077589Z"
Mar 31 10:26:02 instance-42 logstash[22592]: }

This is what I see on Kibana:

FYI, I am using 7.6.1. Filebeat is configured to read my Cowrie json log and send to Logstash. No filtering at this level (AFAIK).

- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /home/user/cowrie/var/log/cowrie/cowrie.json*
...
output.logstash:
  enabled: true
  # The Logstash hosts
  hosts: ["localhost:5044"]

Filtering/modifications occur at Logstash level. See below my config: I see nothing that would remove those eventid or input fields :frowning:

This is my logstash config:

input {
       beats {
       	     port => 5044
	     type => "cowrie"
       }	  
}

filter {
    if [type] == "cowrie" {
        json {
	    source => message
	}
        date {
            match => [ "timestamp", "ISO8601" ]
        }

        if [src_ip]  {

            mutate {
                add_field => { "src_host" => "%{src_ip}" }
            }

            dns {
                reverse => [ "src_host" ]
                nameserver => [ "8.8.8.8", "8.8.4.4" ]
                action => "replace"
                hit_cache_size => 4096
                hit_cache_ttl => 900
                failed_cache_size => 512
                failed_cache_ttl => 900
            }


            geoip {
                source => "src_ip"
                target => "geoip"
                database => "/opt/logstash/vendor/geoip/GeoLite2-City_20200317/GeoLite2-City.mmdb"
            }

        }
        
        mutate {
            remove_tag => [ "beats_input_codec_plain_applied"]
	    remove_field => [ "[log][file][path]", "[log][offset]" ]

        }
    }
}

output {
      if [type] == "cowrie" {
         elasticsearch {
             hosts => ["localhost:9200"]
	     ilm_enabled => auto
	     ilm_rollover_alias => "cowrie-logstash"
         }
         stdout {
             codec => rubydebug { metadata => true }
         }
     }
}

I fixed this. It was a Logstash config issue. In Logstash, I noticed this type of log, with the same error message:

Mar 31 05:44:14 instance-42 logstash[22592]: [2020-03-31T03:44:14,883][WARN ][logstash.outputs.elasticsearch][main] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"cowrie-logstash", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x2082e9af>], :response=>{"index"=>{"_index"=>"cowrie-logstash-2020.03.20-000001", "_type"=>"_doc", "_id"=>"Go-wLnEBYpA6orC0jiYT", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [input] tried to parse field [input] as object, but found a concrete value"}}}}

So, I had a look at my entries and found out that in some cases, I had input field with sublayers

"input" => {
        "type" => "log"
    },

and in other cases, directly a string.

"command" => "echo \"this is my test\""

I searched for the problem (in particular this thread), and found out that this is a situation where Logstash is lost at types: in the first case, it expect an object, and it does not like in the second case to find a string.

Solution: I need to either always to have an object, or always a string, but not mix.

The fix will depend on your log structure, in my case (cowrie), I was doing

json {
 source => "message"
}

which would read the JSON input from field message and create log structure from that. In my case, that was overwriting several fields such as input. The solution is to specify a target, so that fields from the JSON input don't overwrite other fields, and for instance, they'd go in honeypot.input, not input.

json {
 source => "message"
 target => "honeypot"
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.