Logstash json filter not sending to root of event

I see that my event has a field "message", and that field is a JSON object, with several values, e.g message: {"eventid":"cowrie.login.failed","username":"uno85",...}.

I would like all subvalues of message to be put at the root of the event (i.e at the same level as @timestamp for example).

According to the Logstash doc, this should happen by default when I use a JSON filter and do not specify the target field. However, this is my configuration (below), and it's obviously not working because username is within message, not at top level...

filter {
    if [type] == "cowrie" {
        json {
	    source => message
	}

You should use a ruby filter to extract your nested fields and put them at root level. Something like:

ruby {
  code => "
    event.get('message').each { |k, v| event.set(k,v) }
    event.remove('message')
  "
}

Anyway, if you post here the output of the pipeline as it is now (with only your filter applied), it'd be easier to help you.

Ok, let me detail better because I'm not sure that ruby line will do the trick. This is what I get. Notice the message field? Inside, it has eventId, username etc. I'd like eventId, username to be at the same level as _index.

{
  "_index": "filebeat-7.6.1-2020.03.10-000001",
  "_type": "_doc",
  "_id": "T_dsz3ABuJmadUjSadLZ",
  "_version": 1,
  "_score": null,
  "_source": {
    "@timestamp": "2020-03-12T15:45:52.624Z",
    "log": {
      "offset": 7772652,
      "file": {
        "path": "/xxx/cowrie.json"
      }
    },
    "message": "{\"eventid\":\"cowrie.login.success\",\"username\":\"test\",\"password\":\"xxxx\",\"message\":\"login attempt [test/xxxx] succeeded\",\"sensor\":\"instance-39\",\"timestamp\":\"2020-03-12T16:45:49.274837Z\",\"src_ip\":\"IP address\",\"session\":\"c176278d5630\"}",
    "input": {
      "type": "log"
    },
    "ecs": {
      "version": "1.4.0"
    },
    "host": {
      "name": "instance-39"
    },
    "agent": {
      "type": "filebeat",
      "ephemeral_id": "07441667-3997-4039-b3bc-e48209f22e69",
      "hostname": "instance-39",
      "id": "f505e637-ece6-4dc5-ad66-5f986536eae4",
      "version": "7.6.1"
    }
  },
  "fields": {
    "suricata.eve.timestamp": [
      "2020-03-12T15:45:52.624Z"
    ],
    "@timestamp": [
      "2020-03-12T15:45:52.624Z"
    ]
  },
  "sort": [
    -9223372036854776000,
    1584027952624
  ]
}

Now, there are several strange things. Mostly it's just as if my configuration file is not read. Though the /var/log/logstash/logstash-plain.log mentions the right file "/etc/logstash/conf.d/logstash-cowrie.conf

[2020-03-12T15:58:17,679][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/etc/logstash/conf.d/logstash-cowrie.conf"], :thread=>"#<Thread:0x60754869 run>"}

Here is my /etc/logstash/conf.d/logstash-cowrie.conf. The commented lines are things that I tried.

input {
       beats {
       	     port => 5044
	     type => "cowrie"
       }	  
}

filter {
#    if [type] == "cowrie" {
        json {
	    source => message
	}
        date {
            match => [ "timestamp", "ISO8601" ]
        }

        if [src_ip]  {

            mutate {
                add_field => { "src_host" => "%{src_ip}" }
            }

            dns {
                reverse => [ "src_host" ]
                nameserver => [ "8.8.8.8", "8.8.4.4" ]
                action => "replace"
                hit_cache_size => 4096
                hit_cache_ttl => 900
                failed_cache_size => 512
                failed_cache_ttl => 900
            }


            geoip {
                source => "src_ip"
                target => "geoip"
                database => "/opt/logstash/vendor/geoip/GeoLite2-City.mmdb"
		#add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
		#add_field => [ "[geoip][location]", "%{[geoip][latitude]}"  ]
            }

        }
        
        mutate {
	      # 	    convert => [ "[geoip][coordinates]", "float" ]
            remove_tag => [ "beats_input_codec_plain_applied"]
	    remove_field => [ "log.file.path", "log.offset" ]

        }
 #   }
}

output {
#    if [type] == "cowrie" {
        elasticsearch {
            hosts => ["localhost:9200"]
	    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
	    document_type => "%{type}"
        }
        file {
            path => "/tmp/cowrie-logstash.json"
            codec => json
        }
        stdout {
            codec => rubydebug
        }
#    }
}

Several things are not normal:

  • message is not at the right place
  • I have a log.file.path though I ask to remove it
  • I get no src_host field though it should be created as I have a src_ip
  • I have no /tmp/cowrie-logstash.json
  • should the line index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" be creating a new index each day? or only when I restart logstash? Anyway, I only have filebeat-2020.03.10 and filebeat-2020.03.09 (and we're March 12).

This is getting me mad! :frowning:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.