Parse single line multi object json with logstash

Hello,

We are sending collecting cloudwatch logs to central log account's S3 bucket using Cloudwatch --> Log Destination --> Kinesis --> S3

S3 file has multiple json objects in single line, below is the sample format

{"messageType": "DATA_MESSAGE","owner": "owner-id","logGroup": "log-group","logStream": "log-stream","subscriptionFilters": ["Destination"],"logEvents": [{"id": "event-id","timestamp": 1573519068908,"message": "{}"},{"id": "event-id","timestamp": 1573519068908,"message": "{}"}]}{"messageType": "DATA_MESSAGE","owner": "owner-id","logGroup": "log-group","logStream": "log-stream","subscriptionFilters": ["Destination"],"logEvents": [{"id": "event-id","timestamp": 1573518985345,"message": "{}"}]}

With below logstash configuration I can parse first json object but not subsequent objects

input {
file {
path => "sample_masked_log"
codec => "json"
type => "Log"
sincedb_path => "/tmp/sincedb"
start_position => "beginning"
}
}
filter {
json {
source => "message"
}
split {
field => "[logEvents]"
}
mutate {
add_field => {
"log-event-id" => "%{[logEvents][id]}"
"log-event-message" => "%{[logEvents][message]}"
"log-event-time" => "%{[logEvents][timestamp]}"
}
remove_field => [ "[message]" ]
remove_field => [ "[host]" ]
remove_field => [ "[path]" ]
remove_field => [ "[logEvents]" ]
remove_field => [ "[subscriptionFilters]" ]
remove_field => [ "[messageType]" ]
}
}
output {
stdout { codec => rubydebug }
}

If I have log in multiple lines - one json object per line as below then I can get all events.

{"messageType": "DATA_MESSAGE","owner": "owner-id","logGroup": "log-group","logStream": "log-stream","subscriptionFilters": ["Destination"],"logEvents": [{"id": "event-id","timestamp": 1573519068908,"message": "{}"},{"id": "event-id","timestamp": 1573519068908,"message": "{}"}]}
{"messageType": "DATA_MESSAGE","owner": "owner-id","logGroup": "log-group","logStream": "log-stream","subscriptionFilters": ["Destination"],"logEvents": [{"id": "event-id","timestamp": 1573518985345,"message": "{}"}]}

As I cannot change the source, Is there way to parse single line multi object json with logstash?

Thanks in advance for your any suggestions.

You could try using gsub to add a newline char between the JSON objects then use the split plugin. This assumes that the objects appear in succession with no characters between them and that the }{ sequence doesn't appear in any of the JSON values.

filter {
    mutate {
        gsub => [ "message", "\}\{", "}\n{" ]
    }
    split {
        field => "message"
    }
}

EDIT

Couldn't get the above to work but it does seem to work if you actually add a line in to the config like so:

filter {
    mutate {
        gsub => [ "message", "\}\{", "}
{" ]
    }
    split {
        field => "message"
    }
}

Thank you Manu,

Tried your suggestion but no luck.

with below config
input {
file {
path => "/Users/sreddapani001/Desktop/Elasticsearch/SampleLogs/sample_masked_log"
codec => "json"
type => "Log"
sincedb_path => "/tmp/sincedb"
start_position => "beginning"
}
}
filter {
json {
source => "message"
}
# mutate {
# gsub => [ "message", "}{", "}
#{" ]
# }
# split {
# field => "message"
# }
split {
field => "[logEvents]"
}
mutate {
add_field => {
"log-event-id" => "%{[logEvents][id]}"
"log-event-message" => "%{[logEvents][message]}"
"log-event-time" => "%{[logEvents][timestamp]}"
}
remove_field => [ "[message]" ]
remove_field => [ "[host]" ]
remove_field => [ "[path]" ]
remove_field => [ "[logEvents]" ]
remove_field => [ "[subscriptionFilters]" ]
remove_field => [ "[messageType]" ]
}
}
output {
stdout { codec => rubydebug }
}

I am getting two events in first json block as

{
"type" => "Log",
"log-event-id" => "event-id",
"log-event-message" => "{}",
"owner" => "owner-id",
"logGroup" => "log-group",
"log-event-time" => "1573519068908",
"logStream" => "log-stream",
"@version" => "1",
"@timestamp" => 2019-11-13T05:34:39.765Z
}
{
"type" => "Log",
"log-event-id" => "event-id",
"log-event-message" => "{}",
"owner" => "owner-id",
"logGroup" => "log-group",
"log-event-time" => "1573519068908",
"logStream" => "log-stream",
"@version" => "1",
"@timestamp" => 2019-11-13T05:34:39.765Z
}

But If I uncomment your suggestion section in above configuration, I am still getting two events with "_split_type_failure"

{
"@version" => "1",
"log-event-id" => "event-id",
"logStream" => "log-stream",
"log-event-message" => "{}",
"@timestamp" => 2019-11-13T05:37:44.604Z,
"logGroup" => "log-group",
"type" => "Log",
"tags" => [
[0] "_split_type_failure"
],
"log-event-time" => "1573519068908",
"owner" => "owner-id"
}
{
"@version" => "1",
"log-event-id" => "event-id",
"logStream" => "log-stream",
"log-event-message" => "{}",
"@timestamp" => 2019-11-13T05:37:44.604Z,
"logGroup" => "log-group",
"type" => "Log",
"tags" => [
[0] "_split_type_failure"
],
"log-event-time" => "1573519068908",
"owner" => "owner-id"
}

Move my suggestion to before the json filter:

filter {
    mutate {
        gsub => [ "message", "}{", "}
{" ]
    }
    split {
        field => "message"
    }
    json {
        source => "message"
    }
    split {
        field => "[logEvents]"
    }
    mutate {
        add_field => {
            "log-event-id" => "%{[logEvents][id]}"
            "log-event-message" => "%{[logEvents][message]}"
            "log-event-time" => "%{[logEvents][timestamp]}"
        }
        remove_field => [ 
            "[message]",
            "[host]",
            "[path]",
            "[logEvents]",
            "[subscriptionFilters]",
            "[messageType]"
        ]
    }
}

Thanks Manud, Still no joy.

Configuration

input {
        file {
                path => "sample_masked_log"
                codec => "json"
                type => "Log"
                sincedb_path => "/tmp/sincedb"
                start_position => "beginning"
        }
}
filter {
        mutate {
        gsub => [ "message", "}{", "}
{" ]
        }
        split {
                field => "message"
        }
        json {
                source => "message"
        }

        split {
                field => "[logEvents]"
        }
        mutate {
                add_field => {
                        "log-event-id" => "%{[logEvents][id]}"
                        "log-event-message" => "%{[logEvents][message]}"
                        "log-event-time" => "%{[logEvents][timestamp]}"
                }
                remove_field => [
                        "[message]",
                        "[host]",
                        "[path]",
                        "[logEvents]",
                        "[subscriptionFilters]",
                        "[messageType]"
                ]
        }
}
output {
stdout { codec => rubydebug }
}

Content of sample_masked_log file

{"messageType": "DATA_MESSAGE","owner": "owner-id","logGroup": "log-group","logStream": "log-stream","subscriptionFilters": ["Destination"],"logEvents": [{"id": "event-id","timestamp": 1573519068908,"message": "{}"},{"id": "event-id","timestamp": 1573519068908,"message": "{}"}]}{"messageType": "DATA_MESSAGE","owner": "owner-id","logGroup": "log-group","logStream": "log-stream","subscriptionFilters": ["Destination"],"logEvents": [{"id": "event-id","timestamp": 1573518985345,"message": "{}"}]}

Output

{
"@timestamp" => 2019-11-13T18:34:20.892Z,
"logGroup" => "log-group",
"type" => "Log",
"log-event-time" => "1573519068908",
"@version" => "1",
"tags" => [
[0] "_split_type_failure"
],
"logStream" => "log-stream",
"log-event-message" => "{}",
"owner" => "owner-id",
"log-event-id" => "event-id"
}
{
"@timestamp" => 2019-11-13T18:34:20.892Z,
"logGroup" => "log-group",
"type" => "Log",
"log-event-time" => "1573519068908",
"@version" => "1",
"tags" => [
[0] "_split_type_failure"
],
"logStream" => "log-stream",
"log-event-message" => "{}",
"owner" => "owner-id",
"log-event-id" => "event-id"
}

If you remove all of your filters and use

output { stdout { codec => rubydebug } }

What does an event look like?

Hi Badger,

Removed all filters, with below config

input {
file {
path => "/Users/sreddapani001/Desktop/Elasticsearch/SampleLogs/sample_masked_log"
codec => "json"
type => "Log"
sincedb_path => "/tmp/sincedb"
start_position => "beginning"
}
}
output {
stdout { codec => rubydebug }
}

Output

{
"subscriptionFilters" => [
[0] "Destination"
],
"@version" => "1",
"owner" => "owner-id",
"logGroup" => "log-group",
"path" => "/Users/sreddapani001/Desktop/Elasticsearch/SampleLogs/sample_masked_log",
"host" => "AU_C02T92HMG8WN",
"type" => "Log",
"logEvents" => [
[0] {
"timestamp" => 1573519068908,
"id" => "event-id",
"message" => "{}"
},
[1] {
"timestamp" => 1573519068908,
"id" => "event-id",
"message" => "{}"
}
],
"logStream" => "log-stream",
"messageType" => "DATA_MESSAGE",
"@timestamp" => 2019-11-13T20:17:42.077Z
}

The codec parsed your JSON. You should be able to

split { field => "logEvents" }

if you want to.

2 Likes

Hi Badger,

source file has three LogEvents - highlighted with bold

{"messageType": "DATA_MESSAGE","owner": "owner-id","logGroup": "log-group","logStream": "log-stream","subscriptionFilters": ["Destination"],"logEvents": [{"id": "event-id","timestamp": 1573519068908,"message": "{}"},{"id": "event-id","timestamp": 1573519068908,"message": "{}"}]}{"messageType": "DATA_MESSAGE","owner": "owner-id","logGroup": "log-group","logStream": "log-stream","subscriptionFilters": ["Destination"],"logEvents": [{"id": "event-id","timestamp": 1573518985345,"message": "{}"}]}

But I am getting output only two messages - those are part of first json block, parser is not looking into second json block.

Actually I only get one, which is the last one. I believe this occurs because it is parsing the JSON into a hash, and if it sees a duplicate key it simply overwrites the first hash entry.

I suggest you post a new question where you are more explicit about there being duplicate logEvents keys in a single JSON object. I did not get that from the question and I don't think Manu did either.

Thanks Badger, I don't believe this is related to duplicate logevents.

I have added some randomness to source(in bold), I still have same issue.

{"messageType": "DATA_MESSAGE","owner": "owner-id","logGroup": "log-group","logStream": "log-stream","subscriptionFilters": ["Destination"],"logEvents": [{"id": "event-id","timestamp": 1573519068901,"message": "{"event":"1.0"}"},{"id": "event-id","timestamp": 1573519068908,"message": "{"event":"2.0"}"}]}{"messageType": "DATA_MESSAGE","owner": "owner-id","logGroup": "log-group","logStream": "log-stream","subscriptionFilters": ["Destination"],"logEvents": [{"id": "event-id","timestamp": 1573518985345,"message": "{"event":"3.0"}"}]}

I missed that you had the JSON codec in effect. I tested my solution without the JSON codec as we want to split apart the distinct JSON objects before any deserialisation. You get a _split_type_failure because there's no message field to split on if you parse with the codec.

If you remove the codec so that at the start of the filter block gets all objects as a single line string under the message field, then the gsub action will make it a multiline field and split should work correct.

I think @Badger is correct in saying that the JSON codec is parsing each of the objects and overwriting the fields each time.

Thank you both @manud and @Badger,

Yes it was json codec in input section messing it up. I managed to make it work with below config

input {
	file {
		path => "/Users/sreddapani001/Desktop/Elasticsearch/SampleLogs/sample_masked_log"
#		codec => "json"
#		type => "Log"
		sincedb_path => "/tmp/sincedb"
		start_position => "beginning"
	}
}

filter {
	mutate {
        gsub => [ "message", "}{", "}
{" ]
    	}
    	split {
        	field => "message"
    	}

	json {
		source => "message"
	}	

	split {
		field => "[logEvents]"
	}	
	mutate {
		add_field => {
			"log-event-id" => "%{[logEvents][id]}"
			"log-event-message" => "%{[logEvents][message]}"
			"log-event-time" => "%{[logEvents][timestamp]}"
		}
		remove_field => [ 
			"[host]",
			"[path]",
			"[message]",
			"[logEvents]",
			"[subscriptionFilters]",
			"[messageType]" 
		]
	}
}

output {
stdout { codec => rubydebug }
}

I can see three events in output

{
             "@version" => "1",
           "@timestamp" => 2019-11-14T23:53:07.517Z,
             "logGroup" => "log-group",
            "logStream" => "log-stream",
    "log-event-message" => "{\"eventVersion\":\"1.05\"}",
       "log-event-time" => "1573519068901",
         "log-event-id" => "event-id",
                "owner" => "owner-id"
}
{
             "@version" => "1",
           "@timestamp" => 2019-11-14T23:53:07.517Z,
             "logGroup" => "log-group",
            "logStream" => "log-stream",
    "log-event-message" => "{\"eventVersion\":\"2.05\"}",
       "log-event-time" => "1573519068908",
         "log-event-id" => "event-id",
                "owner" => "owner-id"
}
{
             "@version" => "1",
           "@timestamp" => 2019-11-14T23:53:07.517Z,
             "logGroup" => "log-group",
            "logStream" => "log-stream",
    "log-event-message" => "{\"eventVersion\":\"3.05\"}",
       "log-event-time" => "1573518985345",
         "log-event-id" => "event-id",
                "owner" => "owner-id"
}

Thank you again both of you much appreciated for your help.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.