Hi,
I have an issue when trying to ingest data from a CSV file via Logstash to Elasticsearch.
My data contains a nested JSON object (array) that I want to properly parse using the JSON processor in the ingest pipeline.
When testing the pipeline directly from the console (sending a PUT of one line of the CSV) the data is correctly parsed with no errors and the mapping is as expected.
However, if I try to parse the whole CSV file using Logstash, I get an error for the field used in the JSON processor.
The ingest pipeline is defined as below:
PUT _ingest/pipeline/vm-threatintel-cve-pipeline
{
"version": 1,
"processors": [
{
"remove": {
"if": "ctx?.host != null",
"field": "host"
}
},
{
"csv": {
"field": "message",
"separator": ";",
"quote" : "'",
"target_fields": [
"threatintel.id",
"threatintel.score",
"threatintel.risk_matches",
"threatintel.evidence"
],
"trim": true,
"on_failure": [
{
"set": {
"field": "error",
"value": "{{ _ingest.on_failure_message }}"
}
}
]
}
},
{
"grok": {
"if": "ctx?.path != null",
"field": "path",
"patterns": [
"%{DATA}_%{DATA}_%{INT:date}.csv$"
]
}
},
{
"date": {
"if": "ctx?.date != null",
"field": "date",
"target_field": "@timestamp",
"formats": [
"MMddyyyy"
]
}
},
{
"convert": {
"if": "ctx.threatintel?.score != null ",
"field": "threatintel.score",
"type": "integer"
}
},
{
"script": {
"lang": "painless",
"source": """
if (ctx.threatintel.score != null){
if (ctx.threatintel.score > 90) {
ctx.threatintel.severity = 'Very Critical'
} else if (ctx.threatintel.score >= 80 && ctx.threatintel.score< 90) {
ctx.threatintel.severity = 'Critical'
} else if (ctx.threatintel.score >= 65 && ctx.threatintel.score< 80) {
ctx.threatintel.severity = 'High'
} else if (ctx.threatintel.score >= 25 && ctx.threatintel.score< 65) {
ctx.threatintel.severity = 'Medium'
} else if (ctx.threatintel.score >= 5 && ctx.threatintel.score< 25) {
ctx.threatintel.severity = 'Low'
}
}
else {
ctx.threatintel.severity = 'None'
}
"""
}
},
{
"rename": {
"field": "message",
"target_field": "raw_data"
}
},
{
"json" : {
"field" : "threatintel.evidence"
}
}
]
}
The full error on logstash:
[2021-02-02T16:07:28,437][WARN ][logstash.outputs.elasticsearch][main][...]
Could not index event to Elasticsearch.
{
:status=>400,
:action=>
[
"index",
{
:_id=>nil,
:_index=>"vm-threatintel-cve",
:routing=>nil,
:_type=>"_doc",
:pipeline=>"vm-threatintel-cve-pipeline"
},
#<LogStash::Event:0xa3160f5>],
:response=>
{
"index"=>
{
"_index"=>"vm-threatintel-cve",
"_type"=>"_doc",
"_id"=>nil,
"status"=>400,
"error"=>
{
"type"=>"illegal_argument_exception",
"reason"=>"field [evidence] not present as part of path [threatintel.evidence]"
}
}
}
}
The mappings of my index called vm-threatintel-cve
:
{
"vm-threatintel-cve" : {
"mappings" : {
"properties" : {
"raw_data" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"threatintel" : {
"properties" : {
"evidence" : {
"properties" : {
"Criticality" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"CriticalityLabel" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"EvidenceString" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"MitigationString" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"Name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"Rule" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"RuleCategory" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"Timestamp" : {
"type" : "date"
}
}
},
"id" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"risk_matches" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"score" : {
"type" : "long"
},
"severity" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
}
}
After correct ingestion, the data should look something like this:
{
"_index" : "vm-threatintel-cve",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"threatintel" : {
"severity" : "Low",
"score" : 15,
"evidence" : [
{
"Rule" : "Linked to Historical Cyber Exploit",
"CriticalityLabel" : "Low",
"EvidenceString" : "1 sighting on 1 source: xxxxx. Most recent link : xxxx",
"Timestamp" : "2017-03-23T07:53:19.000Z",
"Name" : "linkedToCyberExploit",
"RuleCategory" : "Threat",
"MitigationString" : "",
"Criticality" : "1.0"
},
{
"Rule" : "Historical Verified Proof of Concept Available",
"CriticalityLabel" : "Medium",
"EvidenceString" : "1 sighting on 1 source: yyyyy. 1 execution type: yyy. Most recent link : yyyy",
"Timestamp" : "2014-01-09T00:00:00.000Z",
"Name" : "pocVerified",
"RuleCategory" : "Threat",
"MitigationString" : "",
"Criticality" : "5.0"
}
],
"id" : "CVE-ZZZZ-ZZZZ",
"risk_matches" : "21/122"
},
"raw_data" : """ CVE-ZZZZ-ZZZZ; 15; 21/122; [{"Rule": "Linked to Historical Cyber Exploit", "CriticalityLabel": "Low", "EvidenceString": "1 sighting on 1 source: xxxxx. Most recent link : xxxx", "Timestamp": "2017-03-23T07:53:19.000Z", "Name": "linkedToCyberExploit", "RuleCategory": "Threat", "MitigationString": "", "Criticality": "1.0"}, {"Rule": "Historical Verified Proof of Concept Available", "CriticalityLabel": "Medium", "EvidenceString": "1 sighting on 1 source: yyyyy. 1 execution type: yyy. Most recent link : yyyy", "Timestamp": "2014-01-09T00:00:00.000Z", "Name": "pocVerified", "RuleCategory": "Threat", "MitigationString": "", "Criticality": "5.0"}] """
}
}
If I remove the JSON processor from my ingest pipeline on elastic there are no issues, only when I include it.
My goal is to be able to index the nested JSON object properly to create proper filters, dashboards, etc.
Appreciate any help if anyone has had a similar issue or any alternative solutions to achieve the same goal.