Parsing Nested JSON with Array

Hi Team,

Can anyone help me in parsing the below and store the same in elastic Search. I am new to Logstash.
Wanted to store the metric in Elastic search.

{
"collectionTime": {
"timeStamp": "2020-08-04T17:27:16Z",
"epoch": 1596562036
},
"points": [{
"objectType": "queue",
"tags": {
"description": "-",
"platform": "UNIX",
"qmgr": "ABC",
"queue": "XYZ",
"usage": "NORMAL"
},
"metrics": {
"averageQueueTimeSeconds": 0,
"destructiveMqgetNonPersistentBytes": 0,
"destructiveMqgetNonPersistentMessageCount": 0,
"destructiveMqgetPersistentBytes": 0,
"destructiveMqgetPersistentMessageCount": 0,
"expiredMessages": 0,
"lockContentionPercentage": 0,
"mqcloseCount": 0,
"mqgetBrowseNonPersistentBytes": 0,
"mqgetBrowseNonPersistentMessageCount": 0,
"mqgetBrowsePersistentBytes": 0,
"mqgetBrowsePersistentMessageCount": 0,
"mqgetBytes": 0,
"mqgetCount": 0,
"mqinqCount": 0,
"mqopenCount": 0,
"mqput1NonPersistentMessageCount": 0,
"mqput1PersistentMessageCount": 0,
"mqputBytes": 0,
"mqputMqput1Count": 0,
"mqputNonPersistentMessageCount": 0,
"mqputPersistentMessageCount": 0,
"mqsetCount": 0,
"nonPersistentBytes": 0,
"persistentBytes": 0,
"queueAvoidedPercentage": 0,
"queueAvoidedPutsPercentage": 0,
"queueDepth": 0,
"queuePurgedCount": 0
}
},
{
"objectType": "queueManager",
"tags": {
"platform": "UNIX",
"qmgr": "AZAP01NP"
},
"metrics": {
"alterDurableSubscriptionCount": 0,
"commitCount": 0,
"concurrentConnectionsHighWaterMark": 26,
"cpuLoadFifteenMinuteAveragePercentage": 0.13,
"cpuLoadFiveMinuteAveragePercentage": 0.12,
"cpuLoadOneMinuteAveragePercentage": 0.22,
"createDurableSubscriptionCount": 0,
"createNonDurableSubscriptionCount": 10,
"deleteDurableSubscriptionCount": 0,
"deleteNonDurableSubscriptionCount": 0,
"durableSubscriberHighWaterMark": 2,
"durableSubscriberLowWaterMark": 0,
"expiredMessageCount": 0,
"failedBrowseCount": 0,
"failedCreateAlterResumeSubscriptionCount": 0,
"failedMqcbCount": 0,
"failedMqcloseCount": 0,
"failedMqconnMqconnxCount": 0,
"failedMqgetCount": 16,
"failedMqinqCount": 0,
"failedMqopenCount": 0,
"failedMqput1Count": 0,
"failedMqputCount": 0,
"failedMqsetCount": 0,
"failedMqsubrqCount": 0,
"failedTopicMqputMqput1Count": 0,
"gotNonPersistentMessagesBytes": 22520,
"gotPersistentMessagesBytes": 0,
"intervalDestructiveGetTotalBytes": 22520,
"intervalDestructiveGetTotalCount": 77,
"intervalMqputMqput1TotalBytes": 41932,
"intervalMqputMqput1TotalCount": 120,
"intervalTopicPutTotal": 36472,
"logCurrentPrimarySpaceInUsePercentage": 4.03,
"logFileSystemInUseBytes": 19145949184,
"logFileSystemMaxBytes": 109963223498752,
"logInUseBytes": 5368627200,
"logLogicalWrittenBytes": 0,
"logMaxBytes": 8052940800,
"logPhysicalWrittenBytes": 0,
"logWorkloadPrimarySpaceUtilizationPercentage": 4.03,
"logWriteLatencySeconds": 0.001481,
"logWriteSizeBytes": 6246,
"mqErrorsFileSystemFreeSpacePercentage": 55.73,
"mqErrorsFileSystemInUseBytes": 3796893696,
"mqFdcFileCount": 628,
"mqTraceFileSystemFreeSpacePercentage": 55.73,
"mqTraceFileSystemInUseBytes": 3796893696,
"mqcbCount": 3,
"mqcloseCount": 105,
"mqconnMqconnxCount": 4,
"mqctlCount": 2,
"mqdiscCount": 2,
"mqinqCount": 33,
"mqopenCount": 108,
"mqsetCount": 0,
"mqstatCount": 0,
"mqsubrqCount": 0,
"nonDurableSubscriberHighWaterMark": 25,
"nonDurableSubscriberLowWaterMark": 0,
"nonPersistentMessageBrowseBytes": 0,
"nonPersistentMessageBrowseCount": 0,
"nonPersistentMessageDestructiveGetCount": 77,
"nonPersistentMessageMqput1Count": 0,
"nonPersistentMessageMqputCount": 120,
"nonPersistentTopicMqputMqput1Count": 106,
"persistentMessageBrowseBytes": 0,
"persistentMessageBrowseCount": 0,
"persistentMessageDestructiveGetCount": 0,
"persistentMessageMqput1Count": 0,
"persistentMessageMqputCount": 0,
"persistentTopicMqputMqput1Count": 0,
"publishedToSubscribersBytes": 28408,
"publishedToSubscribersMessageCount": 88,
"purgedQueueCount": 0,
"putNonPersistentMessagesBytes": 41932,
"putPersistentMessagesBytes": 0,
"queueManagerFileSystemFreeSpacePercentage": 99.99,
"queueManagerFileSystemInUseBytes": 2312110080,
"ramFreePercentage": 79.01,
"ramTotalBytes": 7125073920,
"ramTotalEstimateForQueueManagerBytes": 162529280,
"resumeDurableSubscriptionCount": 0,
"rollbackCount": 0,
"subscriptionDeleteFailureCount": 0,
"systemCpuTimeEstimateForQueueManagerPercentage": 0.18,
"systemCpuTimePercentage": 0.35,
"topicMqputMqput1IntervalTotal": 106,
"userCpuTimeEstimateForQueueManagerPercentage": 0.1,
"userCpuTimePercentage": 0.5
}
}
]
}

If it is valid JSON, which it appears to be, then a json filter will parse it...

json { source => "message" }

What would you like to change in the results of that?

I don't want to change anything but want to store these in ES as separate fields, currently they it is storing as message. where full json is coming in message field.

@nikhilsuryan If you don't want to change anything then what @Badger posted should work.

    filter {
      json {
        source => "message"
      }
    }

Thanks for the response, but everything goes into the message field like below: How can we create field for each of elements here? Its not complete but some extract.

{ "collectionTime": { "timeStamp": "2020-08-04T17:27:16Z", "epoch": 1596562036 }, "points": [ { "objectType": "queue", "tags": { "description": "-", "platform": "UNIX", "qmgr": "ABC", "queue": "XYZ", "usage": "NORMAL" }, "metrics": { "averageQueueTimeSeconds": 0, "destructiveMqgetNonPersistentBytes": 0, "destructiveMqgetNonPersistentMessageCount": 0, "destructiveMqgetPersistentBytes": 0, "destructiveMqgetPersistentMessageCount": 0, "expiredMessages": 0, "lockContentionPercentage": 0, "mqcloseCount": 0, "mqgetBrowseNonPersistentBytes": 0, "mqgetBrowseNonPersistentMessageCount": 0, "mqgetBrowsePersistentBytes": 0, "mqgetBrowsePersistentMessageCount": 0, "mqgetBytes": 0, "mqgetCount": 0, "mqinqCount": 0, "mqopenCount": 0, "mqput1NonPersistentMessageCount": 0, "mqput1PersistentMessageCount": 0, "mqputBytes": 0, "mqputMqput1Count": 0, "mqputNonPersistentMessageCount": 0, "mqputPersistentMessageCount": 0, "mqsetCount": 0, "nonPersistentBytes": 0, "persistentBytes": 0, "queueAvoidedPercentage": 0, "queueAvoidedPutsPercentage": 0, "queueDepth": 0, "queuePurgedCount": 0 } }, { "objectType": "queueManager", "tags": { "platform": "UNIX", "qmgr": "AZAP01NP" }, "metrics": { "alterDurableSubscriptionCount": 0, "commitCount": 0, "concurrentConnectionsHighWaterMark": 26, "cpuLoadFifteenMinuteAveragePercentage": 0.13, "cpuLoadFiveMinuteAveragePercentage": 0.12, "cpuLoadOneMinuteAveragePercentage": 0.22, "createDurableSubscriptionCount": 0, "createNonDurableSubscriptionCount": 10, "deleteDurableSubscriptionCount": 0, "deleteNonDurableSubscriptionCount": 0, "durableSubscriberHighWaterMark": 2, "durableSubscriberLowWaterMark": 0, "expiredMessageCount": 0, "failedBrowseCount": 0, "failedCreateAlterResumeSubscriptionCount": 0, "failedMqcbCount": 0, "failedMqcloseCount": 0, "failedMqconnMqconnxCount": 0, "failedMqgetCount": 16, "failedMqinqCount": 0, "failedMqopenCount": 0, "failedMqput1Count": 0, "failedMqputCount": 0, "failedMqsetCount": 0, "failedMqsubrqCount": 0, "failedTopicMqputMqput1Count": 0, "gotNonPersistentMessagesBytes": 22520, "gotPersistentMessagesBytes": 0, "intervalDestructiveGetTotalBytes": 22520, "intervalDestructiveGetTotalCount": 77, "intervalMqputMqput1TotalBytes": 41932, "intervalMqputMqput1TotalCount": 120, "intervalTopicPutTotal": 36472, "logCurrentPrimarySpaceInUsePercentage": 4.03, "logFileSystemInUseBytes": 19145949184, "logFileSystemMaxBytes": 109963223498752, "logInUseBytes": 5368627200, "logLogicalWrittenBytes": 0, "logMaxBytes": 8052940800, "logPhysicalWrittenBytes": 0, "logWorkloadPrimarySpaceUtilizationPercentage": 4.03, "logWriteLatencySeconds": 0.001481, "logWriteSizeBytes": 6246, "mqErrorsFileSystemFreeSpacePercentage": 55.73, "mqErrorsFileSystemInUseBytes": 3796893696, "mqFdcFileCount": 628, "mqTraceFileSystemFreeSpacePercentage": 55.73, "mqTraceFileSystemInUseBytes": 3796893696,

As we said, if you use a json filter it will parse all the fields. It will not modify the message field. If you want the message field removed if it is successfully parsed then use

filter {
    json {
        source => "message"
        remove_field => [ "message" ]
    }
}

Thanks for coming back on this, I tried this, but it is removing complete message.

{
"_index": "mqstats",
"_type": "doc",
"_id": "WjJ1v3MBzv2MNDH67F0z",
"_version": 1,
"_score": 2,
"_source": {
"@version": "1",
"host": "rmg-ne-big-npd-mq-1.big.poc",
"path": "/var/mqm/errors/mq_json.out",
"@timestamp": "2020-08-05T16:30:51.412Z",
"tags": [
"multiline"
]
},
"fields": {
"@timestamp": [
"2020-08-05T16:30:51.412Z"
]
}
}

I want to store each field in ES and query the stats from kibana, the data is not storing as field, instead its going as complete Json document, which won't help in getting the numbers say eg: i have in "mqTraceFileSystemFreeSpacePercentage": 55.73.

OK, that is very strange. If the remove_field was executed, then parsing the JSON succeeded, and collectionTime, metrics, points, etc. should have been added to the event. If you use

 output { stdout { codec => rubydebug } }

what does the output look like, with and without the remove_field?

input {
file {

    codec => multiline {
         pattern => "^\["
         negate => true
         what => previous
         max_lines => 2000
    }

    path => ['/var/mqm/errors/mq_json.out']
    start_position => "beginning"
    sincedb_path => "/dev/null"
}

}
filter {

parse the json and remove the string field upon success

json {
    source => "message"
    remove_field => [ "points" ]
    target => "message"
}

split { field => "[message][points]" }
mutate {
gsub => ["[message]", "[\n]", ""]
gsub => ["[message]", "[\]", ""]
}
}
output {
elasticsearch {
hosts => ["xx.xx.xx.xx:9000"]
index => "mqstats"
}
}

This is what i have used: I have used split to make each points element to be seperate and its working for split and I am getting its Points in the array fine: But after split each entry looks like below:

{
"_index": "mqstats",
"_type": "doc",
"_id": "jjKMv3MBzv2MNDH6SF2o",
"_version": 1,
"_score": 2,
"_source": {
"@timestamp": "2020-08-05T16:55:16.666Z",
"host": "rmg-ne-big-npd-mq-1.big.poc",
"message": {
"collectionTime": {
"timeStamp": "2020-08-04T17:27:16Z",
"epoch": 1596562036
},
"points": {
"tags": {
"platform": "UNIX",
"qmgr": "ABC"
},
"metrics": {
"failedMqput1Count": 0,
"logWriteLatencySeconds": 0.001481,
"mqinqCount": 33,
"deleteNonDurableSubscriptionCount": 0,
"commitCount": 0,
"mqFdcFileCount": 628,
"logWriteSizeBytes": 6246,
"alterDurableSubscriptionCount": 0,
"persistentTopicMqputMqput1Count": 0,
"userCpuTimePercentage": 0.5,
"persistentMessageBrowseBytes": 0,
"failedMqgetCount": 16,
"mqctlCount": 2,
"systemCpuTimeEstimateForQueueManagerPercentage": 0.18,
"failedMqconnMqconnxCount": 0,
"deleteDurableSubscriptionCount": 0,
"mqstatCount": 0,
"durableSubscriberHighWaterMark": 2,
"queueManagerFileSystemInUseBytes": 2312110080,
"nonPersistentMessageBrowseBytes": 0,
"failedMqopenCount": 0,
"persistentMessageDestructiveGetCount": 0,
"failedMqsubrqCount": 0,
"persistentMessageMqput1Count": 0,
"ramFreePercentage": 79.01,
"putNonPersistentMessagesBytes": 41932,
"logFileSystemMaxBytes": 109963223498752,
"durableSubscriberLowWaterMark": 0,
"mqcloseCount": 105,
"nonPersistentMessageDestructiveGetCount": 77,
"persistentMessageBrowseCount": 0,
"cpuLoadFifteenMinuteAveragePercentage": 0.13,
"logFileSystemInUseBytes": 19145949184,
"logInUseBytes": 5368627200,
"logCurrentPrimarySpaceInUsePercentage": 4.03,
"intervalMqputMqput1TotalCount": 120,
"logWorkloadPrimarySpaceUtilizationPercentage": 4.03,
"gotPersistentMessagesBytes": 0,
"subscriptionDeleteFailureCount": 0,
"userCpuTimeEstimateForQueueManagerPercentage": 0.1,
"mqErrorsFileSystemFreeSpacePercentage": 55.73,
"persistentMessageMqputCount": 0,
"createDurableSubscriptionCount": 0,
"failedCreateAlterResumeSubscriptionCount": 0,
"queueManagerFileSystemFreeSpacePercentage": 99.99,
"mqsetCount": 0,
"mqconnMqconnxCount": 4,
"failedBrowseCount": 0,
"failedMqputCount": 0,
"gotNonPersistentMessagesBytes": 22520,
"purgedQueueCount": 0,
"ramTotalEstimateForQueueManagerBytes": 162529280,
"mqcbCount": 3,
"nonPersistentMessageBrowseCount": 0,
"failedMqinqCount": 0,
"nonPersistentMessageMqputCount": 120,
"intervalDestructiveGetTotalBytes": 22520,
"rollbackCount": 0,
"mqTraceFileSystemInUseBytes": 3796893696,
"failedMqcbCount": 0,
"mqdiscCount": 2,
"ramTotalBytes": 7125073920,
"topicMqputMqput1IntervalTotal": 106,
"mqErrorsFileSystemInUseBytes": 3796893696,
"mqopenCount": 108,
"concurrentConnectionsHighWaterMark": 26,
"createNonDurableSubscriptionCount": 10,
"nonDurableSubscriberHighWaterMark": 25,
"publishedToSubscribersMessageCount": 88,
"nonPersistentTopicMqputMqput1Count": 106,
"logPhysicalWrittenBytes": 0,
"resumeDurableSubscriptionCount": 0,
"nonPersistentMessageMqput1Count": 0,
"putPersistentMessagesBytes": 0,
"logLogicalWrittenBytes": 0,
"mqsubrqCount": 0,
"expiredMessageCount": 0,
"nonDurableSubscriberLowWaterMark": 0,
"systemCpuTimePercentage": 0.35,
"cpuLoadOneMinuteAveragePercentage": 0.22,
"mqTraceFileSystemFreeSpacePercentage": 55.73,
"intervalMqputMqput1TotalBytes": 41932,
"cpuLoadFiveMinuteAveragePercentage": 0.12,
"failedMqsetCount": 0,
"intervalDestructiveGetTotalCount": 77,
"publishedToSubscribersBytes": 28408,
"failedMqcloseCount": 0,
"logMaxBytes": 8052940800,
"intervalTopicPutTotal": 36472,
"failedTopicMqputMqput1Count": 0
},
"objectType": "queueManager"
}
},
"tags": [
"multiline"
],
"path": "/var/mqm/errors/mq_json.out",
"@version": "1"
},
"fields": {
"@timestamp": [
"2020-08-05T16:55:16.666Z"
]
}
}

Do you have a problem with that?

@nikhilsuryan Is the issue is you looking to flatten everything so all fields are on the root level?

Thanks for replying, Yes its still not what I want:

As a example:

What we want for a "objectType": "queueManager" with "qmgr": "ABC" what is the "logWriteSizeBytes": 6246, how we can search liek this?

Thanks for coming back:

Yes actually I am looking something like this, so that each element is searchable.

You could copy the fields you want to the root level and remove the ones you don't want in the logstash JSON filter. Or you can leave as is and map as a nested data type.

Then when querying the data you would use dot notation. (points.metrics.alterDurableSubscriptionCount)

https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html

Does that sound like what you are looking for?

Thanks, I think nested will be more complex, what it feels like.

How we can copy the field to root level, do you have any example code to achieve this?

See this post.

Thanks a lot for help, using your code, I can copy the fields into root. Now I have new problem where my file is large and after one entry or array, its getting truncated,
Can i make logstash read only the new entries instead of reading the whole file?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.