Parsing json file with logstash

Happy new year everyone!

hoping someone can shed some light on this, i have a weird issue i cant set right with parsing a json file.

This is the source json file

{
"SHA256": "766be5c99ba674f985ce844add4bc5ec423e90811fbceer5ec84efa3cf1624f4", 
"source": "localhost", 
"Msg": "404 OK", 
"YaraRule": [
    "no_match"
], 
"URL": "http://127.0.0.1", 
"@timestamp": "2020-01-07T08:59:04", 
"raw_msg": "404.19 – Denied by filtering rule", 
"filename": "log.txt", 
"syntax": "text", 
"log_url": "http://127.0.0.1/log.txt", 
"MD5": "2c5cddf13ab55a1d4eca955dfa32d245", 
"expire": "0", 
"user": "user", 
"key": "22op3dfe", 
"size": 107
}

when i run this log stash conf against it, the data is ingested but the individual lines are ingested as separate docs and not as a single doc in ES.

input {
    file {
        path => "/opt/data/*"
        start_position =>"beginning"
	    codec => "json_lines"
	    sincedb_path => "/opt/logreader.sincedb"
  }
}
filter {
  json {
    source => "message"
  }
}
output {
  elasticsearch {
  hosts => ["192.168.136.144:9200"]
  index => "log-test-%{+YYYY.MM.dd}"
  }
}

So i whipped this up and ran it and nothing is being ingested at all! Yet the logstash logs show no errors,

input  {
file {
    path => "/opt/data/*"
    start_position =>"beginning"
	codec => "json_lines"
	sincedb_path => "/opt/logreader.sincedb"
  }
}
filter{
  json { 
source => "message"
target => "doc"
add_field => [ "Encryption", "%{[string]}" ]
add_field => [ "source", "%{[string]}" ]
add_field => [ "msg", "%{[WORD]}" ]
add_field => [ "YaraRule", "%{[WORD]}" ]
add_field => [ "status", "%{[WORD][WORD]}" ]
add_field => [ "url", "%{[URL]}" ]
add_field => [ "timestamp", "%{[TIMESTAMP]}"]
add_field => [ "rawmsg", "%{[raw_msg]}" ]
add_field => [ "filename", "%{[filename]}" ]
add_field => [ "syntax", "%{[word]}" ]
add_field => [ "log_url", "%{[URL]}" ]
add_field => [ "MD5", "%{[MD5]}" ]
add_field => [ "expire", "%{[num]}" ]
add_field => [ "user", "%{[USER]}" ]
add_field => [ "key", "%{[key]}" ]
add_field => [ "size", "%{[num]}" ]
  }
}
output {
  elasticsearch {
  hosts => ["192.168.136.144:9200"]
  index => "log-test-%{+YYYY.MM.dd}"
  }
}

So i guess my question is, is this the right way? or is their an easier way to get the json file ingested as a single document rather than lots of docs per json line ?

Thanks

A json_lines expects each line to contain an entire JSON object, so that's not going to work if your object is pretty-printed over multiple lines.

You need to use a multiline codec on the input. If you want to consume the entire file as a single event then you can use a pattern that never matches. I use

codec => multiline { pattern => "^Spalanzani" negate => true what => previous auto_flush_interval => 1 multiline_tag => "" }

If you have multiple JSON objects in a file you will need to work out the appropriate regexp for yourself.

Hi

I think you'd be fine if you used codec => "json" instead of json_lines in your file{} input. Maybe not if your json object is multi-lined, as @Badger said.

Give it a try with no filters, and just stdout{} as output to see what you get.

Hope this helps

thanks guys.

@Badger your idea works, but EVERYTHING is pushed into the message field, so the entire json blob is in there which isn't what i was aiming for.

@ITIC i tried the json codec and it separates every line in the json file into its own doc, so for a single json source doc with 16 lines of data, i get 16 separate docs each line is a separate msg field.

This was why i was trying my filter by adding new fields and mapping them against the source doc, although i don't understand why it doesn't work. I'm thinking that in the filter section, after i declare the new fields, would i need to then use a grok pattern to push the data in there? or is this a case of once fields are added, i think have to split the data into each separate field?

Hi

So, using the approach @Badger suggested, you get one single message that looks like this:

..., "source": "localhost", "Msg": "404 OK", ...

Right?

If that's the case, you could try to use a kv{} filter with field_split => "," and you should get all your fields automagically created. (https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html)

Hope this helps.

@ITIC yup, thats exactly it. all the json fields are squashed into the message field.

i was just looking at KV and splitting :stuck_out_tongue: spooky eh?

I'll post back if it works.

thanks

If the entire JSON object is in the message field then you can use a json filter to parse it.

@Badger, so the source file is at the top of this post. My input is using the json codec, my filter which im currently experimenting with KV and field splitting isn't using a json filter, and output straight to ES ( or stdout when testing )

The source json file itself, i have 16 fields, pretty static and only the data in the fields changes, so i wanted my logstash to read the json file, create a single doc and ingest it into ES which i think is the more efficient route that having 16 docs, each with a message field from one of the 16 source json file fields.

This is the problem im facing in that the Json filter doesn't parse the json source document correctly and either shoves it all into a single message field in ES or splits the source json file up into 16 individual docs, which is pretty insane as its doing a KV and separating it into individual docs all on its own.

so thus far my filter is like this

filter {
  kv {
      source => "json"
      include_keys => [ "Encryption","source","Msg","YaraRule","status","URL","timestamp","raw_msg","filename","syntax","log_url","MD5","expire","user","key","size" ]
      transform_key => "lowercase"
  }
}

So this just does exactly the same as the original Json codec :confused:

as goes using a groke statement afterwards, although i used

target => "kv"
My understanding of this is in its default setting, that is, no target defined, the fields are listed in the root as individual fields, so removing this should achieve what i need, but it didn't

and then

   grok {
	  match => { "message" => "%{GREEDYDATA:kv}"}

and didn't have any luck. using this method was the same as shoving it all into a new field called "kv" just the same as at the beginning when all the json filed were shoved into the "message" field.

With that input file and

file { path => "/home/user/foo.txt" sincedb_path => "/dev/null" start_position => beginning codec => multiline { pattern => "^Spalanzani" negate => true what => previous auto_flush_interval => 1 multiline_tag => "" } }
filter { json { source => "message" remove_field => [ "message" ] } }
output { stdout { codec => rubydebug { metadata => false } } }

I get a single event

{
   "log_url" => "http://127.0.0.1/log.txt",
       "key" => "22op3dfe",
   "raw_msg" => "404.19 – Denied by filtering rule",
       "MD5" => "2c5cddf13ab55a1d4eca955dfa32d245",
    "syntax" => "text",
  "@version" => "1",
    "SHA256" => "766be5c99ba674f985ce844add4bc5ec423e90811fbceer5ec84efa3cf1624f4",
      "user" => "user",
       "URL" => "http://127.0.0.1",
  "YaraRule" => [
    [0] "no_match"
],
    "expire" => "0",
      "size" => 107,
    "source" => "localhost",
       "Msg" => "404 OK",
  "filename" => "log.txt",
"@timestamp" => 2020-01-07T13:59:04.000Z
}
1 Like

Thanks Badger, that works, drops it into the message field, so now im working on splitting the message field into their own separate fields

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.