Logstash related nested json help

Hello Guru's,

I have never ingested nested json before and I'm stuck.

Log examples:

{"system":"aa","logLevel":"[INFO]","log":{"0":"TT000000 joining provisioningEvent and will publish topics: provisioningEvent"}}
{"system":"bbb.ch","cid":"TT000000","sessionId":"1234567890","logLevel":"[INFO]","log":{"0":"clientHandler: sessionData","1.peripheral":"XX:XX:XX:XX:XX:XX","1.type":"E_RT_EEG_DATA_RCVD"}}
{"system":"bbb.ch","cid":"TT000000","sessionId":"1234567890","logLevel":"[INFO]","log":{"0":"clientHandler: sessionData","1.peripheral":"XX:XX:XX:XX:XX:XX","1.type":"E_RA_EEG_DATA_REQ_CMPLT"}}
{"system":"ad","logLevel":"[INFO]","log":{"0":"join","1":"{\"type\":\"socketJoin\",\"CID\":\"TT000000\",\"version\":\"com.xxx.app_xxxx_phone::1.0.0.001\",\"isClient\":true,\"value\":[{\"room\":\"TT000000\",\"publisherOfTopics\":[\"reduxAction\",\"sessionData\",\"sessionEvent\"]},{\"room\":\"provisioningEvent\",\"publisherOfTopics\":[\"provisioningEvent\"]}]}"}}

The errors I'm attempting to resolve:

"error"=>{"type"=>"document_parsing_exception", "reason"=>"[1:193] object mapping for [log.1] tried to parse field [1] as object, but found a concrete value"}}
"error"=>{"type"=>"document_parsing_exception", "reason"=>"[1:225] failed to parse field [log.1] of type [keyword] in document with id 'rsPop5ABzArOD7JdwIMC'. Preview of field's value: '{type=E_RA_EEG_DATA_REQ_CMPLT}'", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:216"}}

I was attempting to correct it using dynamic_templates

  "dynamic_templates": [
    {
      "strings_as_text": {
        "match_mapping_type": "string",
        "match": "log*",
        "mapping": {
          "type": "text"
        }
      }
    },
    {
      "objects_as_object": {
        "match_mapping_type": "object",
        "match": "log*",
        "mapping": {
          "type": "object"
        }
      }
    }
  ]

Configs:

input {
   file {
     path => "/logs/*.log"
     start_position => "beginning"
   }
}

filter {

  mutate {
    gsub => ["message", "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}:", ""]
  }

  json {
    skip_on_invalid_json => true
    source => "message"
  }
}

output {
    file {
      path => "/logs/output.txt"
    }

       elasticsearch {
         hosts => ["https://localhost:9200"]
          ssl_certificate_verification => "false"
          user => "elastic"
          password => "PASWORD"
          manage_template => false
          data_stream => true
          data_stream_type => "logs"
          data_stream_dataset => "testing-8.11.1"
          data_stream_namespace => "default"
          action => "create"
        }
  }

But I'm not making any traction. I end up hitting the same errors above.
Any help would be greatly appreciated.

Best regards!

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance. See What is OpenSearch and the OpenSearch Dashboard? | Elastic for more details.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

Hello and welcome,

You are hitting a mapping conflict, in Elasticsearch in cannot have in the same index a field that can be an object in some documents and a concrete value in another document.

For example, the documents below cannot be indexed in the same index, it is one or another.

{ "field": "concrete value" }

and

{ "field": { "nested_field": "value" } }

Which one will depend on your mappings and people normally hit this issue when they do not have any explicit mapping and let Elasticsearch create the mapping dynamically.

What you have to do is to create an explicit mapping for one of the two options and adjust your parsing to renaming when you receive a document with a conflicting mapping.

Another option when you have a dynamic json field, is to map the field as a flattened field, this will store the entire json of the field, the documentation has an example for that.

In this case you could map the log field as flattened until you create explicit mapping for your fields.

Hello and thank you for replying.

Would you happen to have an example handy somewhere.
I figured it was a mapping issue from reading around but I haven't been able to get a working config as of yet.

If the field is [log] and it has nested fields like what you've shared how would I filter that so they do not conflict. If there are many possible [log][1.type] etc and other similar fields would I have to define that in a template for all the possibilities?

It depends on the document, you may use a ruby filter to check if the field has a nested field or not, and rename it.

I do not have any example in handy but this is a pretty common question and there are plenty of topics about this in the forum, search for mapping conflict or similar terms.

It also depends on what you want to do with your data, one alternative as mentioned when you have a field that has many dynamic nested keys is to map this field as flattened, this helps with those mapping conflicts, but has some drawbacks like every leaf field is mapped as keyword and some features in Kibana does not entirely support flattened fields.

But since Elasticsearch is not schema-less I would say that mapping the maximum amount of fields that you can is better.

One way to deal with these kind of fields is to map them as flattened and make a copy of the field and store it as a string, Elastic itself does this for data from AWS Cloudtrail for example.

To map a field as flattned you need to have something like this in your template.

        "fieldname": {
          "type": "flattened"
        }

Can you share the template you are using?

Hello and thanks again for some direction.

I was able to solve it with the following filter.

filter {

    mutate {
      gsub => ["message", "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}:", ""]
    }
 
    json {
      skip_on_invalid_json => true
      source => "message"
      target => "parsed"
    }
    ruby {
      code => "
        parsed_log = event.get('[parsed][log]')
        if parsed_log
          parsed_log.each do |key, value|
            new_field_name = 'log_' + key.gsub('.', '_')
            # Convert value to string to ensure compatibility with match?
            value_str = value.to_s
            # Check if the value matches the ISO 8601 date-time format
            if value_str.match?(/^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$/)
              # If it's a date-time string, pass it through without conversion
              event.set(new_field_name, value_str)
            else
              # For other values, just set them as they are
              event.set(new_field_name, value_str)
            end
          end
        end
        event.remove('[parsed][log]')
      "
    }
    mutate {
      add_field => {
        "system" => "%{[parsed][system]}"
        "cid" => "%{[parsed][cid]}"
        "sessionId" => "%{[parsed][sessionId]}"
        "logLevel" => "%{[parsed][logLevel]}"
      }
    }
    if ![parsed][sessionId] or ![parsed][cid] {
      drop { }
    }
    mutate {
      remove_field => ["parsed", "message"]
    }
}
PUT /logstash-logs-stream
{
  "mappings": {
    "dynamic_templates": [
      {
        "dates_end_with_StartTime": {
          "match_pattern": "regex",
          "match": ".*StartTime$",
          "mapping": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
          }
        }
      }
    ]
  }
}

PUT logstash-logs-stream/_settings
{
  "index.mapping.total_fields.limit": 5000
}

Best regards,
John~

Hello,

I didn't have a template initially. I was letting ES create it automagically when the index is initially created.