JSON file parsing


(Will) #1

I have a JSON file from Netscope REST API that I would like to bring into Elastic. I see outout when I use the stdout plugin with rubydebug but when I use elastic output I am getting one entry that really isn't parsed. I see 117 fields when I create the index pattern in Kibana. I feel like I have missed something and perhaps a more experienced person can see my error.
I am not able to share the actual JSON file but here is my config. The JSON file is NOT multiline.
input {

    file {
            path => [ "/Users/schroew/Documents/Scripts/NetskopeAPI/allNetSkopeEvents.json" ]
            type => "netSkopeAPI"
            tags => "netSkopeAPI"
            #codec => "json"
        }

} #close input

filter {
if [path] == "/Users/schroew/Documents/Scripts/NetskopeAPI/allNetSkopeEvents.json" {
json {
source => "message"
}
date {
match => [ "data._insertion_epoch_timestamp", "UNIX" ]
}

    } #close if

} # close filter

output {
#stdout {codec => rubydebug}
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "netskope-27071452"
}
} #close output


(Will) #2

A snippet of the one event that appears in Kibana-
msg:
path:
/Users/schroew/Documents/Scripts/NetskopeAPI/allNetSkopeEvents.json
@timestamp:
July 27th 2017, 14:58:31.635
data:
{ "dst_region": "CA", "srcip": "198.102.213.33", "fromlogs": "yes", "dstport": 443, "src_zipcode": "02139 SNIP


(Will) #3

I think I had the original config totally wrong. Here is the second attempt.
input {

    file {
            path => [ "/Users/schroew/Documents/Scripts/NetskopeAPI/allNetSkopeEvents.json" ]
            type => "netSkopeAPI"
            tags => "netSkopeAPI"
            codec => "json"
        }

} #close input

filter {
if [path] == "/Users/schroew/Documents/Scripts/NetskopeAPI/allNetSkopeEvents.json" {
json {
source => "message"
target => "parsed"
}

mutate {
add_field => { "newid" => "%{[parsed][_id]}" }
add_field => { "[insertion_epoch_timestamp]" => "%{[parsed][_insertion_epoch_timestamp]}"}
}
date {
match => [ "insertion_epoch_timestamp", "UNIX" ]
}

    } #close if

} # close filter

output {
stdout {codec => rubydebug}
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "netskope-27071852"
}
} #close output
I have the following output on std out ( some snipping for brevity)
{
"msg" => "",
"path" => "/Users/schroew/Documents/Scripts/NetskopeAPI/allNetSkopeEvents.json",
"@timestamp" => 2017-07-28T13:50:34.836Z,
"data" => [
[0] {
"dst_region" => "WA",
"srcip" => "1.1.1.1",
"fromlogs" => "yes",
"dstport" => 80,
"src_zipcode" => "unknown",
"cci" => 50,
"suppression_start_time" => 1501192800,
SNIP SNIP SNIP
}
],
"newid" => "%{[parsed][_id]}", <------ WRONG
"@version" => "1",
"host" => "myhost.com",
"type" => "netSkopeAPI",
"status" => "success",
"tags" => [
[0] "netSkopeAPI"
]
}


(Will) #4

I got this to work based on a different posting from 2016 found here:

I think some additional documentation around JSON would be helpful. I am open to help if someone from Elastic can assist as I am pretty dangerously new to this.

My config now looks like this:
input {

    file {
            path => [ "/Users/schroew/Documents/Scripts/NetskopeAPI/allNetSkopeEvents.json" ]
            type => "netSkopeAPI"
            tags => "netSkopeAPI"
            codec => "json"
        }

} #close input

filter {
if [path] == "/Users/schroew/Documents/Scripts/NetskopeAPI/allNetSkopeEvents.json" {

     split {
            field => "data"
    }

mutate {
rename => { "[data][dst_region]" => "dst_region" }
rename => { "[data][srcip]" => "srcip" }
rename => { "[data][fromlogs]" => "fromlogs" }
rename => { "[data][dstport]" => "dstport" }
rename => { "[data][src_zipcode]" => "src_zipcode" }
rename => { "[data][cci]" => "cci" }
rename => { "[data][suppression_start_time]" => "suppression_start_time" }
rename => { "[data][req_cnt]" => "req_cnt" }
rename => { "[data][type]" => "type" }
rename => { "[data][page_duration]" => "page_duration" }
rename => { "[data][ccl]" => "ccl" }
rename => { "[data][_session_begin]" => "session_begin" }
rename => { "[data][action]" => "action" }
rename => { "[data][dstip]" => "dstip" }
rename => { "[data][aggregated_user]" => "aggregated_user" }
rename => { "[data][sfwder]" => "sfwder" }
rename => { "[data][app]" => "app" }
rename => { "[data][appcategory]" => "appcategory" }
rename => { "[data][count]" => "count" }
rename => { "[data][log_file_name]" => "log_file_name" }
rename => { "[data][organization_unit]" => "organization_unit" }
rename => { "[data][suppression_end_time]" => "suppression_end_time" }
rename => { "[data][server_bytes]" => "server_bytes" }
rename => { "[data][src_country]" => "src_country" }
rename => { "[data][dst_zipcode]" => "dst_zipcode" }
rename => { "[data][dst_longitude]" => "dst_longitude" }
rename => { "[data][_id]" => "id" }
rename => { "[data][numbytes]" => "numbytes" }
rename => { "[data][device]" => "device" }
rename => { "[data][_insertion_epoch_timestamp]" => "insertion_epoch_timestamp" }
rename => { "[data][app_session_id]" => "app_session_id" }
rename => { "[data][access_method]" => "access_method" }
rename => { "[data][traffic_type]" => "traffic_type" }
rename => { "[data][src_longitude]" => "src_longitude" }
rename => { "[data][alert_type]" => "alert_type" }
rename => { "[data][resp_cnt]" => "resp_cnt" }
rename => { "[data][alert]" => "alert" }
rename => { "[data][browser]" => "browser" }
rename => { "[data][policy]" => "policy" }
rename => { "[data][dst_latitude]" => "dst_latitude" }
rename => { "[data][timestamp]" => "timestamp" }
rename => { "[data][dst_country]" => "dst_country" }
rename => { "[data][os]" => "os" }
rename => { "[data][org]" => "org" }
rename => { "[data][client_bytes]" => "client_bytes" }
rename => { "[data][src_region]" => "src_region" }
rename => { "[data][acked]" => "acked" }
rename => { "[data][src_location]" => "src_location" }
rename => { "[data][site]" => "site" }
rename => { "[data][dst_location]" => "dst_location" }
rename => { "[data][serial]" => "serial" }
rename => { "[data][src_latitude]" => "src_latitude" }
rename => { "[data][category]" => "category" }
rename => { "[data][user]" => "user" }
rename => { "[data][alert_name]" => "alert_name" }

convert => {"src_latitude" => "float"}
convert => {"dst_latitude" => "float"}
convert => {"src_longitude" => "float"}
convert => {"dst_longitude" => "float"}
}
date {
match => [ "insertion_epoch_timestamp", "UNIX" ]
}

    } #close if

} # close filter

output {
stdout {codec => rubydebug}
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "netskope-28071138"
}
} #close output


(Magnus Bäck) #5

I think some additional documentation around JSON would be helpful.

What is it that you want to see documented?


(Will) #6

I really struggled with understanding how to work with the JSON data. I first read about the Logstash JSON filter. The doc refers to source => "message" Is this the parent object of the JSON string? What do I do next to get to a point where I have data that is parsed and available for queries? I felt like I was missing the next step and still fail to grasp how to use the JSON Logstash filter. I searched some more and found where a discuss poster had used split on the top level field. I tried that and eventually got to where I could access the fields. I don't know if this is right, best practice or I got really lucky.
I ended up working with this in two different ways.
1: I split on the top level field and then rename like this-
split {field => "data"}
mutate {
rename => { "[data][dsthost]" => "dst_host" }
rename => { "[data][dstport]" => "dstport" }
snip....
Once the fields are renamed I can work with the new fields for data type conversion etc.

The other way I approached this was to still use split but rather that access the data in [parent_tag][child_data_tag] format I did this-
split { field => ["labels"] }
mutate {
convert => {"labels.created_on" => "integer"}
convert => {"labels.last_valid_on" => "integer"}
This approach seemed to work and I didn't have to define tons of rename lines. In the index the fields were dotted labels.name.
I guess my point is that the documentation is really good but I ran into a situation where I did not have a clue what do do next and some concrete examples would have been helpful. If you would like I can help write up a How-To on parse JSON into useful fields for visualization or query so the next person doesn't have to struggle. Please bear in mind that I have been using the ES for about 4 weeks, have not done any training and am solely relying in the Discuss forum and documentation to be my guide. If my efforts are not to far off base with respect to parsing JSON data I would be more than happy to write up a How-To parse and Index JSON and send it to you.


(Magnus Bäck) #7

I really struggled with understanding how to work with the JSON data. I first read about the Logstash JSON filter. The doc refers to source => “message” Is this the parent object of the JSON string?

I'm not sure I understand what "parent object of the JSON string" means. The documentation says this about the source option:

For example, if you have JSON data in the message field:

filter {
json {
source => "message"
}
}
The above would parse the json from the message field

I'm not sure how this can be written to make it more clear that the source option should contain the name of the field containing the JSON string to parse.

What do I do next to get to a point where I have data that is parsed and available for queries?

Nothing? Well, except having en elasticsearch output if that's the kind of queries you want to ask. But that's unrelated to JSON parsing.

I guess my point is that the documentation is really good but I ran into a situation where I did not have a clue what do do next and some concrete examples would have been helpful. If you would like I can help write up a How-To on parse JSON into useful fields for visualization or query so the next person doesn’t have to struggle.

Perhaps Elastic's tech writer @dedemorton can help out here?


(system) #8

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.