Json codec vs. json_lines codec for collecting mongoexport output JSON?

I'm trying to get Logstash to ingest a JSON file created by a mongoexport call. The file looks like this:

{
	"_id": "3c51d008add94422abf107f0",
	"name": "Pulse Get SVN By ID",
	"type": "automation",
	"tasks": {
		"5336": {
			"name": "svnGetByIdHandler",
			"canvasName": "svnGetByIdHandler",
			"summary": "Get SVN by ID",
			"description": "svn.getByIdHandler",
			"location": "Adapter",
			"locationType": "Pulse2",
			"app": "Pulse2",
			"type": "automatic",
			"displayName": "Pulse2",
			"variables": {
				"incoming": {
					"objId": "$var.job.objId",
					"queryActivedb": "",
					"adapter_id": "Pulse"
				},
				"outgoing": {
					"result": null
				},
				"error": "",
				"decorators": []
			},
			"start_time": "2023-08-24T19:59:12.130Z",
			"end_time": 1.692907152253E+12,
			"finish_state": "error"
		}
	},
	"last_updated": {
		"$date": "2023-08-24T19:59:12.261Z"
	}
}

I tried using codec=>json in my logstash pipeline config but Logstash indexed each line as its own document in Elasticsearch, which is not what we want.

I then tried using codec=>json_lines. The JSON data does not show up in Elasticsearch. Also, there is no indication of a parse error or anything like that in logstash-plain.log

Is there another codec I should be using? Here is the config:

input {
  file {
    path => "/var/log/mongodb/errored-jobs.json"
    start_position => "beginning"
    codec => json_lines
    sincedb_path => "/dev/null"
  }
}

filter {
  mutate {
    remove_field => ["_id"]
  }
}

output {
  elasticsearch {
    hosts => ["http://ourhost.com:9200"]
    index => "itential-jobs-%{+yyyyMMdd}"
    #user => "elastic"
    #password => "changeme"
  }
}

You will need a multiline codec to combine all the lines of a single object into a single event. You can then parse it using a json filter.

1 Like

Is the file pretty printed or you have one json document per line? I'm not familiar with mongoexport, but from the documentation it seems to be able to export with one json per line, which is what logstash expects. Can you share how you are using mongoexport, which command line you are running?

The json_lines codec should not be used in line oriented inputs, like the file input, this is in the documentation.

Do not use this codec if your source input is line-oriented JSON, for example, redis or file inputs. Rather, use the json codec.

You should use the json codec, but you need to have one json document per line, if you cannot export your data this way you will need to use a multiline codec.

In this case I do not understand what is meant by "line oriented input" or "line oriented json" in this context.

I will try the multiline codec and see if I have better luck with that.

Here is the mongoexport command:

mongoexport --db=itential --collection=jobs --query='{ "status": "error"}' --sort='{ "last_updated": -1}' --limit=10 --out=errored-jobs.json --pretty

The json codec expects a single line of text, that means it is line-oriented. The json_lines codec expects a text buffer, from which it will parse out \n delimited lines, and then parse each one as JSON.

1 Like

My errored-jobs.json file is delimited by line feeds. That's why I thought json_lines was the correct codec to use for it. But I guess a file is not a text buffer.

Remove the --pretty argument as this is used to pretty print the json, which is what you do not want.

Without this it should export one json document per line which is what logstash and the json codec expects.

1 Like

Thanks. removing -pretty resulted in a JSON file that Logstash attempted to parse this time, instead of not doing anything with it.

Looks like my next step is decide on a solution for mapping explosion. I'm grateful this article was posted. Will investigate and report back.

From the document you shared it looks like the tasks field could be an issue, so you will probably need to map this field as a flattened field before creating your index.

This will make elasticsearch to store the entire json of the tasks obejct, but will create just one mapping.

1 Like

Thanks for that.

Just to be clear, I need to do all the mapping setup in Dev Tools Console, and not in the filter section of my logstash pipeline config file, correct?

Also all the examples I can find in the mapping documentation show explicit indexes instead of index pattern. We would like to ingest the MongoDB export JSON files into an index pattern like ""itential-jobs-%{+yyyyMMdd}", so we need to set up a mapping that works for the pattern instead of just, say, itential-jobs-20230824

Ok, I ran this in Dev Tools Console

PUT itential-jobs-*/_mapping
{
  "properties": {
    "tasks": {
      "type": "flattened"
    }
  }
}

I then ran mongoexport to create a new JSON file and ran Logstash. Saw this error in logstash-plain.log

"error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [error.message] cannot be changed from type [text] to [ObjectMapper]"}}}

I guess I need to delete the itential-jobs-* index pattern, recreate it, rerun the above PUT, then try again.

Yes, you would need to reindex.

I was unfamiliar with flattened mappings since I no longer use elasticsearch, so this is the approach I would have taken. In a data set with so many text fields it may not make a lot of difference, but there will be other use cases where it is more useful.

If you have incoming JSON like this

     "tasks" => {
    "5336" => {
        "description" => "svn.getByIdHandler",
         "canvasName" => "svnGetByIdHandler",
           "end_time" => 1646000000000.0,
            "summary" => "Get SVN by ID",
               "name" => "svnGetByIdHandler"
    },
       "1" => {
        "description" => "wacky",
         "canvasName" => "Sidney",
           "end_time" => 1692907152253.0,
            "summary" => "Boo!",
               "name" => "foo"
    }
},

then every time you add a task you add five new fields to the index (20+ with your original data structure). You can use a ruby filter

    ruby {
        code => '
            begin
                event.get("tasks").each { |k, v| event.set("[tasks][#{k}][task]", k) }

                # This converts a hash { "key" => "value" } to [ "key", "value" ]
                t = event.get("tasks").to_a
                # Next we throw away the keys since those are also in the tasks hash
                newTasks = []
                t.each { |x| newTasks << x[1] }
                event.set("tasks", newTasks)
            rescue
            end
        '
    }

which will produce

     "tasks" => [
    [0] {
           "end_time" => 1646000000000.0,
        "description" => "svn.getByIdHandler",
               "task" => "5336",
         "canvasName" => "svnGetByIdHandler",
            "summary" => "Get SVN by ID",
               "name" => "svnGetByIdHandler"
    },
    [1] {
           "end_time" => 1692907152253.0,
        "description" => "wacky",
               "task" => "1",
         "canvasName" => "Sidney",
            "summary" => "Boo!",
               "name" => "foo"
    }
],

where the number of fields does not grow with the number of tasks.

1 Like

I tried deleting the index itential-jobs-* again.

Then I tried recreating it with this mapping:

PUT /itential-jobs-20230826
{
	"mappings": {
		"properties": {
			"name": {
				"type": "text"
			},
			"error": {
				"properties": {
					"task": {
						"type": "text"
					},
					"message": {
						"properties": {
							"response": {
								"properties": {
									"errors": {
										"properties": {
											"message": {
												"type": "text"
											},
											"decoded_msg": {
												"type": "text"
											}
										}
									}
								}
							}
						}
					}
				}
			}
		}
	}
}

Then I re-exported the JSON data and restarted Logstash. I see this error now.

 "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [error.message] tried to parse field [message] as object, but found a concrete value"}

But in the relevant section of the JSON data being parsed, message looks like an object to me, unless I misunderstand what concrete value means

"error": [
		{
			"task": "5336",
			"message": {
				"icode": "AD.500",
				"IAPerror": {
					"origin": "Pulse-connectorRest-handleEndResponse",
					"displayString": "Error 404 received on request",
					"recommendation": "Verify the request is accurate via debug logs and postman",
					"code": 404
				},
				"metrics": {
					"code": 404,
					"timeouts": 0,
					"redirects": 0,
					"retries": 0,
					"tripTime": 48,
					"isThrottling": false,
					"capabilityTime": "64ms"
				},
				"response": {
					"data": [],
					"errors": [
						{
							"source": "",
							"errors": [],
							"severity": 0,
							"silent": 0,
							"message": "CODE~PULSE~NodeAPI~notFound|SVN|123",
							"decoded_msg": "SVN with id 123 does not exist"
						}
					],
					"meta": {
						"count": 0
					}
				}
			},
			"timestamp": 1.693088895732E+12
		}
	]

So is the answer to define mappings for icode, IAPError, and so forth?

Alright I tried this mapping and made a little more progress.

PUT /itential-jobs-20230828
{
	"mappings": {
		"dynamic": "false",
		"properties": {
			"error": {
				"properties": {
					"task": {
						"type": "text"
					},
					"message": {
						"properties": {
							"icode": {
								"type": "text"
							},
							"IAPerror": {
								"type": "flattened"
							},
							"metrics": {
								"type": "flattened"
							},
							"response": {
								"properties": {
									"errors": {
										"properties": {
											"decoded_msg": {
												"type": "text"
											},
											"message": {
												"type": "text"
											}
										}
									}
								}
							}
						}
					},
					"timestamp": {
						"type": "date_nanos"
					}
				}
			},
			"name": {
				"type": "text"
			}
		}
	}
}

But ES does not like the date_nanos type for timestamp (above). The timestamp looks like this in the data

"timestamp": 1.69324948E+12

Tried changing the type to "double" which got rid of date parsing error, but now I'm back to the same error about [error.message] being a concrete value instead of an object.

I did not scroll down far enough in the generated JSON data. error.message is of type string instead of object in one of the elements under error:

"error": [
		{
			"task": "5336",
			"message": {
				"icode": "AD.500",
				"IAPerror": {
					"origin": "Pulse-connectorRest-handleEndResponse",
					"displayString": "Error 404 received on request",
					"recommendation": "Verify the request is accurate via debug logs and postman",
					"code": 404,
					"raw_response": {
						"status": "success",
						"code": 404,
						"headers": {
							"server": "nginx/1.22.1",
							"date": "Mon, 28 Aug 2023 19:04:39 GMT",
							"content-type": "application/json; charset=utf-8",
							"transfer-encoding": "chunked",
							"connection": "close",
							"vary": "Origin, Accept-Encoding",
							"access-control-allow-credentials": "true",
							"x-xss-protection": "1; mode=block",
							"x-frame-options": "DENY",
							"x-download-options": "noopen",
							"x-content-type-options": "nosniff",
							"set-cookie": [
								"SERVERID=serv3; path=/"
							],
							"cache-control": "private"
						},
						"response": "{\"data\":[],\"errors\":[{\"source\":\"\",\"errors\":[],\"severity\":0,\"silent\":0,\"message\":\"CODE~PULSE~NodeAPI~notFound|SVN|123\",\"decoded_msg\":\"SVN with id 123 does not exist\"}],\"meta\":{\"count\":0}}",
						"redirects": 0,
						"tripTime": "46ms",
						"reqHdr": {
							"Content-Type": "application/json",
							"Accept": "application/json",
							"Authorization": "****"
						}
					}
				},
				"metrics": {
					"code": 404,
					"timeouts": 0,
					"redirects": 0,
					"retries": 0,
					"tripTime": 46,
					"isThrottling": false,
					"capabilityTime": "102ms"
				},
				"response": {
					"data": [],
					"errors": [
						{
							"source": "",
							"errors": [],
							"severity": 0,
							"silent": 0,
							"message": "CODE~PULSE~NodeAPI~notFound|SVN|123",
							"decoded_msg": "SVN with id 123 does not exist"
						}
					],
					"meta": {
						"count": 0
					}
				}
			},
			"timestamp": 1.69324948E+12
		},
		{
			"task": "e5ba",
			"message": {
				"code": 500,
				"message": {
					"apiVersion": "1",
					"method": "transformations.run",
					"error": {
						"code": 500,
						"message": "Incoming schema validation errors:\n\nschema $id: message\nerror: data should be object",
						"errors": [
							{}
						]
					}
				}
			},
			"timestamp": 1.693249480067E+12
		},
		{
			"task": "job",
			"message": "Job has no available transitions. eb40, 78e, 8f2c, 4b8b could have led to the workflow end task, but did not. These tasks performed in a way that the end of the workflow could not be reached.",
			"timestamp": 1.69324948008E+12
		}
	]

I tried modifying my mapping like this:

"error": {
				"properties": {
					"task": {
						"type": "text"
					},
					"message": {
						"type": [
							"string", "object"
						]

However, ES console did not like that

{
  "error" : {
    "root_cause" : [
      {
        "type" : "mapper_parsing_exception",
        "reason" : "No handler for type [[string, object]] declared on field [message]"
      }
    ],
    "type" : "mapper_parsing_exception",
    "reason" : "Failed to parse mapping [_doc]: No handler for type [[string, object]] declared on field [message]",
    "caused_by" : {
      "type" : "mapper_parsing_exception",
      "reason" : "No handler for type [[string, object]] declared on field [message]"
    }
  },
  "status" : 400
}

This does not exist, a field can only have one data type, also, there is no string data type.

My suggestion is the same as the previous answer, you need to
analyse your document and check which fields can cause a mapping explosion.

In this case, it seems that the error field also can have many different fields and those fields can change type, so you should try to map error as flattened as well.

I would also suggest that you first extract a sample of documents from your mongodb and analyse the structure to create the mapping.

This is the mapping that finally got Logstash and ES to ingest the JSON generated by mongoexport.

PUT /itential-jobs-20230828
{
	"mappings": {
	  "dynamic": "false",
		"properties": {
			"tasks": {
				"type": "flattened"
			},
			"transitions": {
				"type": "flattened"
			},
			"variables": {
				"type": "flattened"
			},
			"error": {
				"type": "flattened"
			},
			"name": {
				"type": "text"
			}
		}
	}
}

Thanks for all the help. I'm sure I'll be asked to refine my Logstash usage more down the road, but just getting some sort of error data into ES was an essential first step for us

We're getting a mapping explosion again.

I think it's because the mapping only applied to the index itential-jobs-20230828

We have the Logstash Elasticsearch output set to itential-jobs-%{+yyyyMMdd}. So when documents are indexed to itential-jobs-20230831 the mapping does not apply to them.

Any suggestions other than to manually run that PUT command in Dev Tools every day?

You need to create an index template for your index pattern with your mappings.

Check the documentation about index templates.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.