Logstash json filter not sending to root of event

I see that my event has a field "message", and that field is a JSON object, with several values, e.g message: {"eventid":"cowrie.login.failed","username":"uno85",...}.

I would like all subvalues of message to be put at the root of the event (i.e at the same level as @timestamp for example).

According to the Logstash doc, this should happen by default when I use a JSON filter and do not specify the target field. However, this is my configuration (below), and it's obviously not working because username is within message, not at top level...

filter {
    if [type] == "cowrie" {
        json {
	    source => message
	}

You should use a ruby filter to extract your nested fields and put them at root level. Something like:

ruby {
  code => "
    event.get('message').each { |k, v| event.set(k,v) }
    event.remove('message')
  "
}

Anyway, if you post here the output of the pipeline as it is now (with only your filter applied), it'd be easier to help you.

1 Like

Ok, let me detail better because I'm not sure that ruby line will do the trick. This is what I get. Notice the message field? Inside, it has eventId, username etc. I'd like eventId, username to be at the same level as _index.

{
  "_index": "filebeat-7.6.1-2020.03.10-000001",
  "_type": "_doc",
  "_id": "T_dsz3ABuJmadUjSadLZ",
  "_version": 1,
  "_score": null,
  "_source": {
    "@timestamp": "2020-03-12T15:45:52.624Z",
    "log": {
      "offset": 7772652,
      "file": {
        "path": "/xxx/cowrie.json"
      }
    },
    "message": "{\"eventid\":\"cowrie.login.success\",\"username\":\"test\",\"password\":\"xxxx\",\"message\":\"login attempt [test/xxxx] succeeded\",\"sensor\":\"instance-39\",\"timestamp\":\"2020-03-12T16:45:49.274837Z\",\"src_ip\":\"IP address\",\"session\":\"c176278d5630\"}",
    "input": {
      "type": "log"
    },
    "ecs": {
      "version": "1.4.0"
    },
    "host": {
      "name": "instance-39"
    },
    "agent": {
      "type": "filebeat",
      "ephemeral_id": "07441667-3997-4039-b3bc-e48209f22e69",
      "hostname": "instance-39",
      "id": "f505e637-ece6-4dc5-ad66-5f986536eae4",
      "version": "7.6.1"
    }
  },
  "fields": {
    "suricata.eve.timestamp": [
      "2020-03-12T15:45:52.624Z"
    ],
    "@timestamp": [
      "2020-03-12T15:45:52.624Z"
    ]
  },
  "sort": [
    -9223372036854776000,
    1584027952624
  ]
}

Now, there are several strange things. Mostly it's just as if my configuration file is not read. Though the /var/log/logstash/logstash-plain.log mentions the right file "/etc/logstash/conf.d/logstash-cowrie.conf

[2020-03-12T15:58:17,679][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/etc/logstash/conf.d/logstash-cowrie.conf"], :thread=>"#<Thread:0x60754869 run>"}

Here is my /etc/logstash/conf.d/logstash-cowrie.conf. The commented lines are things that I tried.

input {
       beats {
       	     port => 5044
	     type => "cowrie"
       }	  
}

filter {
#    if [type] == "cowrie" {
        json {
	    source => message
	}
        date {
            match => [ "timestamp", "ISO8601" ]
        }

        if [src_ip]  {

            mutate {
                add_field => { "src_host" => "%{src_ip}" }
            }

            dns {
                reverse => [ "src_host" ]
                nameserver => [ "8.8.8.8", "8.8.4.4" ]
                action => "replace"
                hit_cache_size => 4096
                hit_cache_ttl => 900
                failed_cache_size => 512
                failed_cache_ttl => 900
            }


            geoip {
                source => "src_ip"
                target => "geoip"
                database => "/opt/logstash/vendor/geoip/GeoLite2-City.mmdb"
		#add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
		#add_field => [ "[geoip][location]", "%{[geoip][latitude]}"  ]
            }

        }
        
        mutate {
	      # 	    convert => [ "[geoip][coordinates]", "float" ]
            remove_tag => [ "beats_input_codec_plain_applied"]
	    remove_field => [ "log.file.path", "log.offset" ]

        }
 #   }
}

output {
#    if [type] == "cowrie" {
        elasticsearch {
            hosts => ["localhost:9200"]
	    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
	    document_type => "%{type}"
        }
        file {
            path => "/tmp/cowrie-logstash.json"
            codec => json
        }
        stdout {
            codec => rubydebug
        }
#    }
}

Several things are not normal:

  • message is not at the right place
  • I have a log.file.path though I ask to remove it
  • I get no src_host field though it should be created as I have a src_ip
  • I have no /tmp/cowrie-logstash.json
  • should the line index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" be creating a new index each day? or only when I restart logstash? Anyway, I only have filebeat-2020.03.10 and filebeat-2020.03.09 (and we're March 12).

This is getting me mad! :frowning:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.