Problem elastic "document_parsing_exception" type field

Hello, I have encountered a common problem that many Elastic users face, but I have not yet found an answer to it and I do not understand how to solve it so that I do not lose messages that are sent by Elastic.
I use filebуat to connect to aws cloudtrail and receive logs.

And I replaced that Elasticsearch began to receive not all messages with logs; they were received selectively.
I started analyzing logstash logs and saw the following error messages:

"status"=>400, "error"=>{"type"=>"document_parsing_exception", "reason"=>"[1:3774] failed to parse field [requestParameters.DescribeVpcEndpointsRequest] of type [text] in document with id '0Qe4oo4BOD3coVYthBSs'. Preview of field's value: '{MaxResults=1000}'", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:3756"}

or

"status"=>400, "error"=>{"type"=>"document_parsing_exception", "reason"=>"[1:3057] object mapping for [requestParameters.filter] tried to parse field [null] as object, but found a concrete value"}

As far as I understand, certain fields can be either in the form of text in one case, and in another case they can be an object.
How can this problem be solved? Is it possible to specify a universal field type or not try to determine the field type for requestParameters.filter in Jason Data at all. and leave everything that it contains in its original form, if you don’t know how to dynamically determine the field type.

I will be glad for any help

You need to map the top-level field requestParameters as a flattened field, this way the entire json object of this field will be stored.

This is how elastic do that with the Cloudtrail integration.

They also copy the object field to another field to also keep the field as a string.

You can check how the ingest pipeline process the logs here.

Are you using logstash? I have a filter for the 4 fields in cloudtrail logs that are dynamic, maybe you can adapt it for your use case.

Hi leandrojmp,

thanks for your reply.
To connect to AWS, I use filebeat, it connects using the cloudtrail module and takes the logs I need, then I transfer the data to logstash for parsing json messages and then transfer it to elastic.

I will be glad if you share your ideas.

Can you send it directly to Elasticsearch? If you do you will not need to worry with the parsing and would not have this issue.

In logstash I use the following filters to deal with the dynamic fields from cloudtrail, you may adapt it to your pipeline, my cloudtrail json has its fields nested under a field named json.

# requestParameters, responseElements, additionalEventData and serviceEventDetails can be dynamic and lead to mapping conflicts
#
# the following filter perform these stepes:
# - remove the field if it is empty
# - double check if the field exists and it is not empty
# - if true, mutate/add_field will create a new field with the json object as a string
# - if true, mutate/rename will rename the field as a nested field inside aws.cloudtrail.flattened, which needs to be mapped as flattened
#
filter {
    ruby {
        code => '
            event.remove("[json][requestParameters]") if event.get("[json][requestParameters]").nil?
            event.remove("[json][responseElements]") if event.get("[json][responseElements]").nil?
            event.remove("[json][additionalEventData]") if event.get("[json][additionalEventData]").nil?
            event.remove("[json][serviceEventDetails]") if event.get("[json][serviceEventDetails]").nil?
        '
    }
    if [json][requestParameters] and [json][requestParameters] != "" {
        mutate {
            add_field => {
                "[aws][cloudtrail][request_parameters]" => "%{[json][requestParameters]}"
            }
        }
        mutate {
            rename => {
                "[json][requestParameters]" => "[aws][cloudtrail][flattened][request_parameters]"
            }
        }

    }
    if [json][responseElements] and [json][responseElements] != "" {
        mutate {
            add_field => {
                "[aws][cloudtrail][response_elements]" => "%{[json][responseElements]}"
            }
        }
        mutate {
            rename => {
                "[json][responseElements]" => "[aws][cloudtrail][flattened][response_elements]"
            }
        }
    }
    if [json][additionalEventData] and [json][additionalEventData] != "" {
        mutate {
            add_field => {
                "[aws][cloudtrail][additional_eventdata]" => "%{[json][additionalEventData]}"
            }
        }
        mutate {
            rename => {
                "[json][additionalEventData]" => "[aws][cloudtrail][flattened][additional_eventdata]"
            }
        }
    }
    if [json][serviceEventDetails] and [json][serviceEventDetails] != "" {
        mutate {
            add_field => {
                "[aws][cloudtrail][service_event_details]" => "%{[json][serviceEventDetails]}"
            }
        }
        mutate {
            rename => {
                "[json][serviceEventDetails]" => "[aws][cloudtrail][flattened][service_event_details]"
            }
        }
    }
}
#

This basically replicate this logic from the elasticsearch ingest pipeline.

1 Like

Leandro Pereira, thanks for the code!

I also thought about the possibility of sending data directly, but I’m not sure that in this case the message itself from the cloud trail will be parsed, since in the original it is stored there in the json format. one event contains a lot of information, and when I didn’t parse it, I just sent it through Logtash, then in this case the message had a pure format in JSON since it is stored on S3.
Are you saying that filebeat itself parses the json message using the required fields? Or do you still need to use a separate module in filebeat?

Modules in Filebeat uses ingest pipelines in Elasticsearch to parse the data, so if you are using a module it is expected that the data will be parsed, but this will be done in Elasticsearch, not in Filebeat.

I do not use Filebeat anymore as I'm using the Elastic Agent and the Elastic Agent integrations will also use ingest pipelines in elasticsearch to parse your data.

You can check the integrations available here, there is one for AWS Cloudtrail and the instructions to configure.

If you want to check the ingest pipelines used you can check it here.


Leandro Pereira
, thanks

I checked, sent the logs directly to elastic, it works.
And indeed aws.cloudtrail.request_parameters is saved as an object. I remembered why I also redirected this data through logstash, using logstash I discarded messages containing /CloudTrail-Digest/ and I added information from the dictionary about the recipientAccountId. Now I don’t know how best to do this?

I tried to use your scheme, but I still couldn’t parse the fields dynamically :frowning: or perhaps this scheme only works for data_stream and not for regular index creation.

"status"=>400, "error"=>{"type"=>"document_parsing_exception", "reason"=>"[1:2091] object mapping for [aws.cloudtrail.flattened.request_parameters.DescribeVpcEndpointsRequest] tried to parse field [DescribeVpcEndpointsRequest] as object, but found a concrete value"}

or

 "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [aws.cloudtrail.flattened.request_parameters.DescribeTransitGatewaysRequest.Filter.Value.content] cannot be changed from type [long] to [text]"}

Apparently I'm still doing something wrong or haven't foreseen something...

Did you create the mapping in your template before sending the data?

What does your template looks like?

/usr/share/filebeat/module/aws/cloudtrail/ingest/pipeline.yml

  - rename:
      field: "json.errorMessage"
      target_field: "aws.cloudtrail.error_message"
      ignore_failure: true
  - script:
      lang: painless
      source: |
        if (ctx.aws.cloudtrail?.flattened == null) {
            Map map = new HashMap();
            ctx.aws.cloudtrail.put("flattened", map);
          }
        if (ctx.json?.requestParameters != null) {
          ctx.aws.cloudtrail.request_parameters = ctx.json.requestParameters.toString();
          if (ctx.aws.cloudtrail.request_parameters.length() < 32766) {
            ctx.aws.cloudtrail.flattened.put("request_parameters", ctx.json.requestParameters);
          }
        }
        if (ctx.json?.responseElements != null) {
          ctx.aws.cloudtrail.response_elements = ctx.json.responseElements.toString();
          if (ctx.aws.cloudtrail.response_elements.length() < 32766) {
            ctx.aws.cloudtrail.flattened.put("response_elements", ctx.json.responseElements);
          }
        }
        if (ctx.json?.additionalEventData != null) {
          ctx.aws.cloudtrail.additional_eventdata = ctx.json.additionalEventData.toString();
          if (ctx.aws.cloudtrail.additional_eventdata.length() < 32766) {
            ctx.aws.cloudtrail.flattened.put("additional_eventdata", ctx.json.additionalEventData);
          }
        }
        if (ctx.json?.serviceEventDetails != null) {
          ctx.aws.cloudtrail.service_event_details = ctx.json.serviceEventDetails.toString();
          if (ctx.aws.cloudtrail.service_event_details.length() < 32766) {
            ctx.aws.cloudtrail.flattened.put("service_event_details", ctx.json.serviceEventDetails);
          }
        }
      ignore_failure: true
  - rename:
      field: "json.requestID"
      target_field: "aws.cloudtrail.request_id"
      ignore_failure: true

config logstash

json {
    source => "message"
        }
ruby {
        code => '
            event.remove("[requestParameters]") if event.get("[requestParameters]").nil?
            event.remove("[responseElements]") if event.get("[responseElements]").nil?
            event.remove("[additionalEventData]") if event.get("[additionalEventData]").nil?
            event.remove("[serviceEventDetails]") if event.get("[serviceEventDetails]").nil?
        '
    }
    if [requestParameters] and [requestParameters] != "" {
        mutate {
            add_field => {
                "[aws][cloudtrail][request_parameters]" => "%{[requestParameters]}"
            }
        }
        mutate {
            rename => {
                "[requestParameters]" => "[aws][cloudtrail][flattened][request_parameters]"
            }
        }

    }
    if [responseElements] and [responseElements] != "" {
        mutate {
            add_field => {
                "[aws][cloudtrail][response_elements]" => "%{[responseElements]}"
            }
        }
        mutate {
            rename => {
                "[responseElements]" => "[aws][cloudtrail][flattened][response_elements]"
            }
        }
    }
    if [additionalEventData] and [additionalEventData] != "" {
        mutate {
            add_field => {
                "[aws][cloudtrail][additional_eventdata]" => "%{[additionalEventData]}"
            }
        }
        mutate {
            rename => {
                "[additionalEventData]" => "[aws][cloudtrail][flattened][additional_eventdata]"
            }
        }
    }
    if [serviceEventDetails] and [serviceEventDetails] != "" {
        mutate {
            add_field => {
                "[aws][cloudtrail][service_event_details]" => "%{[serviceEventDetails]}"
            }
        }
        mutate {
            rename => {
                "[serviceEventDetails]" => "[aws][cloudtrail][flattened][service_event_details]"
            }
        }
    }

My log without embedded json.
I'm wondering if I can write a config in Logstash that will not parse fields containing requestParameters.xxx responseElements.xxx into json { source => "message" }
requestParameters.xxx
responseElements.xxx
additionalEventData.xxx
serviceEventDetails.xxx

Hello,

This does not answer the question, did you create the mapping or a template to apply your mapping?

The mapping is done in Elasticsearch, you didn´t share anything related to it, it is required to create the correct mapping for the flattened fields before infdexing any data.


Leandro Pereira
, sorry
At first I didn’t quite understand what you were talking about.
I'm using this template, but it's an old version.

{
  "_routing": {
    "required": false
  },
  "numeric_detection": false,
  "dynamic_date_formats": [
    "strict_date_optional_time",
    "yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z"
  ],
  "_source": {
    "excludes": [],
    "includes": [],
    "enabled": true
  },
  "dynamic": true,
  "dynamic_templates": [],
  "date_detection": true,
  "properties": {
    "requestParameters.filter": {
      "type": "text"
    },
    "aws.cloudtrail.flattened.request_parameters.DescribeTransitGatewaysRequest.Filter.Value.content": {
      "type": "text"
    },
    "apiVersion": {
      "eager_global_ordinals": false,
      "index_phrases": false,
      "fielddata": false,
      "norms": true,
      "index": true,
      "store": false,
      "type": "text",
      "index_options": "positions"
    },
    "requestParameters.maxResults": {
      "coerce": true,
      "index": true,
      "ignore_malformed": false,
      "store": false,
      "type": "long",
      "doc_values": true
    },
    "requestParameters.tagSpecificationSet.items.tags.value": {
      "eager_global_ordinals": false,
      "index_phrases": false,
      "fielddata": false,
      "norms": true,
      "index": true,
      "store": false,
      "type": "text",
      "index_options": "positions"
    },
    "requestParameters.DescribeEgressOnlyInternetGatewaysRequest": {
      "eager_global_ordinals": false,
      "index_phrases": false,
      "fielddata": false,
      "norms": true,
      "index": true,
      "store": false,
      "type": "text",
      "index_options": "positions"
    },
    "responseElements.credentials.sessionToken": {
      "eager_global_ordinals": false,
      "index_phrases": false,
      "fielddata": false,
      "norms": true,
      "index": true,
      "store": false,
      "type": "text",
      "index_options": "positions"
    },
    "message": {
      "type": "text"
    },
    "requestParameters.domainName": {
      "eager_global_ordinals": false,
      "index_phrases": false,
      "fielddata": false,
      "norms": true,
      "index": true,
      "store": false,
      "type": "text",
      "index_options": "positions"
    },
    "requestParameters.DescribeFlowLogsRequest": {
      "type": "text"
    },
    "requestParameters.maxItems": {
      "type": "text"
    },
    "tags": {
      "type": "text"
    }
  }
}

You suggest redefining new fields:
[aws][cloudtrail][flattened][request_parameters]
[aws][cloudtrail][flattened][response_elements]
[aws][cloudtrail][flattened][additional_eventdata]
[aws][cloudtrail][flattened][service_event_details]

what type should I use? Text?

I create new template
this is mappings field

{
  "_routing": {
    "required": false
  },
  "numeric_detection": false,
  "dynamic_date_formats": [
    "strict_date_optional_time",
    "yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z"
  ],
  "_source": {
    "excludes": [],
    "includes": [],
    "enabled": true
  },
  "dynamic": true,
  "dynamic_templates": [],
  "date_detection": true,
  "properties": {
    "apiVersion": {
      "eager_global_ordinals": false,
      "index_phrases": false,
      "fielddata": false,
      "norms": true,
      "index": true,
      "store": false,
      "type": "text",
      "index_options": "positions"
    },
    "aws.cloudtrail.flattened.additional_eventdata": {
      "type": "flattened"
    },
    "aws.cloudtrail.flattened.service_event_details": {
      "type": "flattened"
    },
    "message": {
      "type": "text"
    },
    "aws.cloudtrail.flattened.response_elements": {
      "type": "flattened"
    },
    "tags": {
      "type": "text"
    },
    "aws.cloudtrail.flattened.request_parameters": {
      "type": "flattened"
    }
  }
}

is this what you mean?

Yes, but your mappings are wrong.

It needs to be something like this:

        "aws": {
          "properties": {
            "cloudtrail": {
              "properties": {
                "flattened": {
                  "properties": {
                    "additional_eventdata": { "type":"flattened" },
                    "request_parameters": { "type":"flattened" },
                    "response_elements": { "type":"flattened" },
                    "service_event_details": { "type":"flattened" }
                  }
                },
                "additional_eventdata": { 
                  "type": "keyword",
                  "fields":{
                    "text": {
                      "type": "text"
                    }
                  }
                },
                "request_parameters": { 
                  "type": "keyword",
                  "fields":{
                    "text": {
                      "type": "text"
                    }
                  }
                },
                "response_elements": { 
                  "type": "keyword",
                  "fields":{
                    "text": {
                      "type": "text"
                    }
                  }
                },
                "service_event_details": { 
                  "type": "keyword",
                  "fields":{
                    "text": {
                      "type": "text"
                    }
                  }
                }
              }
            }
          }
        }

Leandro Pereira
Thanks for the recommendation. I slightly redesigned my template, now I don’t see any problems. I'll do one more test, let's hope everything goes well.