Pipeline: field [evidence] not present as part of path [threatintel.evidence]

Hi,

I have an issue when trying to ingest data from a CSV file via Logstash to Elasticsearch.
My data contains a nested JSON object (array) that I want to properly parse using the JSON processor in the ingest pipeline.
When testing the pipeline directly from the console (sending a PUT of one line of the CSV) the data is correctly parsed with no errors and the mapping is as expected.
However, if I try to parse the whole CSV file using Logstash, I get an error for the field used in the JSON processor.

The ingest pipeline is defined as below:

PUT _ingest/pipeline/vm-threatintel-cve-pipeline
{
  "version": 1,
  "processors": [
    {
      "remove": {
        "if": "ctx?.host != null",
        "field": "host"
      }
    },
    {
      "csv": {
        "field": "message",
        "separator": ";",
        "quote" : "'",
        "target_fields": [
          "threatintel.id",
          "threatintel.score",
          "threatintel.risk_matches",
          "threatintel.evidence"
        ],
        "trim": true,
        "on_failure": [
          {
            "set": {
              "field": "error",
              "value": "{{ _ingest.on_failure_message }}"
            }
          }
        ]
      }
    },
    {
      "grok": {
        "if": "ctx?.path != null",
        "field": "path",
        "patterns": [
          "%{DATA}_%{DATA}_%{INT:date}.csv$"
        ]
      }
    },
    {
      "date": {
        "if": "ctx?.date != null",
        "field": "date",
        "target_field": "@timestamp",
        "formats": [
          "MMddyyyy"
        ]
      }
    },
    {
      "convert": {
        "if": "ctx.threatintel?.score != null ",
        "field": "threatintel.score",
        "type": "integer"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """
            if (ctx.threatintel.score != null){
              if (ctx.threatintel.score > 90) {
                ctx.threatintel.severity = 'Very Critical'
              } else if (ctx.threatintel.score >= 80 && ctx.threatintel.score< 90) {
                ctx.threatintel.severity = 'Critical'
              } else if (ctx.threatintel.score >= 65 && ctx.threatintel.score< 80) {
                ctx.threatintel.severity = 'High'
              } else if (ctx.threatintel.score >= 25 && ctx.threatintel.score< 65) {
                ctx.threatintel.severity = 'Medium'
              } else if (ctx.threatintel.score >= 5 && ctx.threatintel.score< 25) {
                ctx.threatintel.severity = 'Low'
              }
            }
            else {
              ctx.threatintel.severity = 'None'
            }
          """
      }
    },
    {
      "rename": {
        "field": "message",
        "target_field": "raw_data"
      }
    },
    {
      "json" : {
        "field" : "threatintel.evidence"
      }
    }
  ]
}

The full error on logstash:

[2021-02-02T16:07:28,437][WARN ][logstash.outputs.elasticsearch][main][...] 
Could not index event to Elasticsearch. 
{
    :status=>400, 
    :action=>
      [
        "index", 
        {
            :_id=>nil, 
            :_index=>"vm-threatintel-cve", 
            :routing=>nil, 
            :_type=>"_doc", 
            :pipeline=>"vm-threatintel-cve-pipeline"
         }, 
      #<LogStash::Event:0xa3160f5>], 
    :response=>
      {
        "index"=>
            {
                "_index"=>"vm-threatintel-cve", 
                "_type"=>"_doc", 
                "_id"=>nil, 
                "status"=>400, 
                "error"=>
                    {
                        "type"=>"illegal_argument_exception", 
                        "reason"=>"field [evidence] not present as part of path [threatintel.evidence]"
                     }
               }
       }
}

The mappings of my index called vm-threatintel-cve:

{
  "vm-threatintel-cve" : {
    "mappings" : {
      "properties" : {
        "raw_data" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "threatintel" : {
          "properties" : {
            "evidence" : {
              "properties" : {
                "Criticality" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                },
                "CriticalityLabel" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                },
                "EvidenceString" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                },
                "MitigationString" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                },
                "Name" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                },
                "Rule" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                },
                "RuleCategory" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                },
                "Timestamp" : {
                  "type" : "date"
                }
              }
            },
            "id" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "risk_matches" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "score" : {
              "type" : "long"
            },
            "severity" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            }
          }
        }
      }
    }
  }
}

After correct ingestion, the data should look something like this:

{
  "_index" : "vm-threatintel-cve",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "threatintel" : {
      "severity" : "Low",
      "score" : 15,
      "evidence" : [
        {
          "Rule" : "Linked to Historical Cyber Exploit",
          "CriticalityLabel" : "Low",
          "EvidenceString" : "1 sighting on 1 source: xxxxx. Most recent link : xxxx",
          "Timestamp" : "2017-03-23T07:53:19.000Z",
          "Name" : "linkedToCyberExploit",
          "RuleCategory" : "Threat",
          "MitigationString" : "",
          "Criticality" : "1.0"
        },
        {
          "Rule" : "Historical Verified Proof of Concept Available",
          "CriticalityLabel" : "Medium",
          "EvidenceString" : "1 sighting on 1 source: yyyyy. 1 execution type: yyy. Most recent link : yyyy",
          "Timestamp" : "2014-01-09T00:00:00.000Z",
          "Name" : "pocVerified",
          "RuleCategory" : "Threat",
          "MitigationString" : "",
          "Criticality" : "5.0"
        }
      ],
      "id" : "CVE-ZZZZ-ZZZZ",
      "risk_matches" : "21/122"
    },
    "raw_data" : """ CVE-ZZZZ-ZZZZ; 15; 21/122; [{"Rule": "Linked to Historical Cyber Exploit", "CriticalityLabel": "Low", "EvidenceString": "1 sighting on 1 source: xxxxx. Most recent link : xxxx", "Timestamp": "2017-03-23T07:53:19.000Z", "Name": "linkedToCyberExploit", "RuleCategory": "Threat", "MitigationString": "", "Criticality": "1.0"}, {"Rule": "Historical Verified Proof of Concept Available", "CriticalityLabel": "Medium", "EvidenceString": "1 sighting on 1 source: yyyyy. 1 execution type: yyy. Most recent link : yyyy", "Timestamp": "2014-01-09T00:00:00.000Z", "Name": "pocVerified", "RuleCategory": "Threat", "MitigationString": "", "Criticality": "5.0"}] """
  }
}

If I remove the JSON processor from my ingest pipeline on elastic there are no issues, only when I include it.
My goal is to be able to index the nested JSON object properly to create proper filters, dashboards, etc.

Appreciate any help if anyone has had a similar issue or any alternative solutions to achieve the same goal.

Hi,

Can you try to parse just a few lines of your CSV ? I'm wondering if there are any special characters or multiline events that is breaking it.

Thanks

Hi Clement,

I've found one error in the actual data that I was trying to ingest, in which the data wasn't correctly formatted as JSON. So even within the ES console it wasn't working.
After fixing this issue, I am able to ingest any line from the CSV without any issues from within the Console on ES, but the issue with logstash still persists.
Also tried it with a small subset (5 lines) of the actual CSV, where each line is ingested correctly from within the console, but same problem within logstash.

I also tried to ingest with logstash without the JSON processor in the ingest pipeline and strangely it's not able to ingest properly the field with the JSON object, giving me an Illegal character inside unquoted field error.

Any ideas on what else could be the issue?

Thanks

I found a workaround by simply adding another semicolon (my column delimiter) at the end of each line in my csv. Worked like a charm.
Still don't understand why this issue appeared as even trying with a normal string in place of the JSON object in the csv I got the Illegal character inside unquoted field error. My other indices work well and have a similar ingest pipeline and didn't require the additional column delimiter.

Other solutions I tried such as this one didn't work for me.