Which filter(s) to extract array of JSON values, then use array to look up nested JSON objects?

I have a Logstash pipeline which is processing JSON data from a flat file. The data is somewhat structured like this:

{
	"name": "job1",
	"tasks": {
		"75fc": {
			"name": "restAction",
			"variables": {
				"incoming": {
					"params": {
						"path": "/restconf/data/thefile",
						"accept": "application/yang-data+json",
						"method": "PUT",
						"contentType": "application/yang-data+json"
					},
					"body": {
						"cust-eplan:cust-eplan": [
                                                    {
							"customer-name": "Decostumer",
							"device": [{
									"device-name": "my-dvice",
									"interface": {
										"GigabitEthernet-iosxr": [
                                                                                    {
											"interface-id": "0/0/0/32",
											"encapsulation": "default",
											"mep-id": 3
										    }
                                                                                  ]
									}
								},
								{
									"device-name": "my-device2",
									"interface": {
										"GigabitEthernet-iosxr": [
                                                                                    {
											"interface-id": "0/0/0/32",
											"encapsulation": "default",
											"mep-id": 4
										    }
                                                                               ]
									}
								}
							]
						}]
					}
				}
			}
		}
	},
	"error": [
            {
			"task": "75fc",
			"message": {
				"ietf-restconf:errors": {
					"error": [{
						"error-type": "application",
						"error-tag": "malformed-message",
						"error-path": "/pathto/problem",
						"error-message": "missing element: name in thepath"
					}]
				}
			},
			"timestamp": 1.695060733555E+12
		},
		{
			"task": "job",
			"message": "Job has no available transitions. 6649, cb04, f5dd could have led to the workflow end task, but did not. These tasks performed in a way that the end of the workflow could not be reached.",
			"timestamp": 1.69506073357E+12
		}
	]
}

I need to extract the array of [error][task] values, then use this array to extract the matching array of [tasks]["taskId"][variables][incoming] values, then add a field called errored_task_incoming_variables or something like that.

So for example, if the JSON data has [error][task] values of ["75fc", "75fd", 75fe"], the pipeline should then fetch these values:

[tasks]["75fc"][variables][incoming]
[tasks]["75fd"][variables][incoming]
[tasks]["75fe"][variables][incoming]

Is the ruby filter the only option to accomplish my goal? Or can I use a combination of the aggregate and json filters? Or do I need to use all 3?

Ok, so I'm working on the first part of the desired solution, which is to parse the array of [error][task] values.

I added this json filter to my filter section

filter {
  json {
    source => "[error][task]"
    target => "error_tasks"
  }

  mutate {
    remove_field => ["created", "last_updated", "tasks", "transitions", "variables", "watchers", "ancestors", "decorators"]
    rename => {"[metrics][start_time]" => "start_time"}
  }

 # more filters
}

However, there is no error_tasks object or array in the document after it is indexed into ES. The source JSON looks like this

"error": [
      {
        "timestamp": 1696461875958,
        "message": "First parameter must be a Number",
        "task": "c1b"
      },
      {
        "timestamp": 1696461875969,
        "message": "Job has no available transitions. d6d2 could have led to the workflow end task, but did not. These tasks performed in a way that the end of the workflow could not be reached.",
        "task": "job"
      }
    ]

there should have been an error_tasks field created, looking like the below, but it is missing

 "error_tasks": [
      "c1b", "job"
    ]

Any ideas what might be wrong with my json filter?

Maybe I need to do a mutate add_field first, by modifying the filter section as follows?

filter {
  mutate {
    add_field => { "error_tasks"}
  }

  json {
    source => "[error][task]"
    target => "error_tasks"
  }

  mutate {
    remove_field => ["created", "last_updated", "tasks", "transitions", "variables", "watchers", "ancestors", "decorators"]
    rename => {"[metrics][start_time]" => "start_time"}
  }

 # more filters
}

I had assumed the json filter would create the error_tasks field, but maybe it just does nothing if the field does not already exist?

"Expected one of [ \\t\\r\\n], \"#\", \"=>\" at line 16, column 38 (byte 298) after
 filter {\n  mutate {\n    add_field => { \"error_task_array\""

Logstash did not like my mutate config. You would think add_field would be simple to use....

Add field expects a hash of field name and value pairs. So it would be

mutate { add_field => { "fieldname" => "value" } }

But the json filter will create the target field, you do not need to use add_field to add it before calling the json filter.

Your example data does not appear to have an [error][task] field, so the json filter will be a no-op. Instead you have [error][0][task], [error][1][task] etc.

Thanks for pointing out the issue with [error][task] vs [error][0][task]

Would this get all the task values out of the elements of the error JSON array?

json {
    source => "[error][*][task]"
    target => "error_task_array"
  }

I'll also try this on my own but it can take up to 10 minutes between restarting the Logstash service and the cron job running the mongoexport command to generate the JSON file for Logstash to read.

No, you cannot use a wildcard like that. If you need to iterate over all the entries in an array then you would use a ruby filter. But it is really unclear to me why you are using a json filter at all. The [error] field will not exist unless the JSON object has already been parsed, and in that case [error][0][task] etc. will also have been parsed, and you will not need to parse them a second time.

Ah I was afraid of that.

I've been avoiding the ruby filter because I thought there might be some other filter(s) that would do the job, and take me far less time to learn how to use. Using the ruby filter after all entails learning a whole new programming language.

Alright, thanks and I'll triple my estimate of how long it will take for me to get this done when asked by my team. Unless there's a csharp or java filter coming soon.

Would this code be on the right track for extracting the [error][0][task], [error][1][task], etc. values from the JSON document and storing them into an array?

def filter(event)
  taskArray = Array.new
  errorElements = event.get("[error]")
  if errorElements.is_a? Array
    errorElements.each_index { |x|
      taskArray.push(errorElements[x]["task"])
    }
  end
end

I'll test it eventually but... I'm trying to write up the test right now, using the testing framework that is supposed to be included with this ruby filter.

That looks reasonable. You need to do something with taskArray. Maybe event.set?

1 Like

Thanks, I was missing both the event.set call and the return [event]

The filter seems to be executing. I see an error but it is apparently due to test failure. I'll inquire about that on my other post if I struggle with the test.

I modified the filter method as follows:

def filter(event)
  # Extract the array of task IDs from the error section
  taskArray = Array.new
  errorElements = event.get("[error]")
  if errorElements.is_a? Array
    errorElements.each_index { |x|
      taskArray.push(errorElements[x]["task"])
    }
  end

  # Extract the task variables and set them in error_task_array
  taskVariableArray = Array.new
  taskArray.each do |taskId|
    if taskId != "job"
      taskVariables = event.get("[tasks][#{taskId}][variables]")
      event.set("[error_task_array][#{taskId}][variables]", taskVariables)
    end
  end
  return [event]
end

I see the expected JSON now in ES

"error_task_array": {
      "5336": {
        "variables": {
          "incoming": {
            "adapter_id": "Pulse",
            "queryActivedb": "",
            "objId": "$var.job.objId"
          },
          "outgoing": {
            "result": null
          },
          "error": "",
          "decorators": []
        }
      },
      "e5ba": {
        "variables": {
          "incoming": {
            "tr_id": "641097ffe17c19738789b4cb",
            "options": {
              "revertToDefaultValue": true,
              "extractOutput": true,
              "validateIncoming": true
            },
            "variableMap": {
              "message": "$var.c28d.error",
              "status": "error",
              "incomingStatus": "$var.job.wfStatus",
              "node": "",
              "wfName": "Pulse Get SVN By Name"
            }
          },
          "outgoing": {
            "outgoingStatus": null
          },
          "decorators": []
        }
      }
    }

However I am also seeing some unexpected JSON

"fields": {
    "error_task_array.5336.variables.incoming.objId": [
      "$var.job.objId"
    ],
    "error_task_array.5336.variables.incoming.adapter_id": [
      "Pulse"
    ],
    "error_task_array.5336.variables.error": [
      ""
    ]
}

How do I prevent the extra JSON objects (ok, they're technically single element arrays) from being generated in the fields section?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.