Group field from multiple rows based on common key

spotlight.csv:

ENTITY_ID,ENTITY_TYPE,SPN_FIRM_ID,IA_ID,BRANCH_ID
1323575,AC,101,10503,11002
1323575,AC,101,10503,359006

Expecting Output JSON:
{
"keyInformation": {
"entityId": 1323575,
"entityType": "AC",
"firmId": 101
},
"entitlements": {
"branchId": [ 11002,359006],
"iaId": 10503
}
}

spotlight.conf:

input {
file {
path => "C:/ELK/logstash-7.2.0/config/spotlight.csv"
start_position => "beginning"
sincedb_path => "NUL"
}
}

filter {
csv {
separator => ","
columns => ["ENTITY_ID","ENTITY_TYPE","SPN_FIRM_ID",
"IA_ID","BRANCH_ID"]
}

mutate {

    rename => {
		"ENTITY_ID"=> "[keyInformation][entityId]"
		"ENTITY_TYPE"=> "[keyInformation][entityType]"
		"SPN_FIRM_ID"=> "[keyInformation][firmId]"
		"IA_ID"=> "[entitlements][iaId]"
		"BRANCH_ID"=> "[entitlements][branchId]"
	}

}

}
output {
stdout { codec => rubydebug }
elasticsearch{
hosts => [ "localhost:9200" ]
index => "spotlight"
document_id => "%{[keyInformation][entityId]}"
document_type => "_doc"
user => elastic
password => test01
action => "update"
doc_as_upsert => "true"
}
}

Branch Id is different for the same key, need to create a list of branch Id's for the key. Can some help me how to achieve using logstash?

Use an aggregate filter.

    csv { autodetect_column_names => true }
    aggregate {
        task_id => "%{ENTITY_ID}"
        code => '
            map["keyInformation"] ||= {}
            map["keyInformation"]["entityId"] = event.get("ENTITY_ID")
            map["keyInformation"]["entityType"] = event.get("ENTITY_TYPE")
            map["keyInformation"]["firmId"] = event.get("SPN_FIRM_ID")
            map["entitlements"] ||= {}
            map["entitlements"]["branchId"] ||= []
            map["entitlements"]["iaId"] = event.get("IA_ID")
            map["entitlements"]["branchId"] << event.get("BRANCH_ID")
            event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout => 2
    }

Make sure you set --pipeline.workers 1

Thanks Badger, this is really helpful.

One more question.

I need output as below

Output:

"associatedAccounts": [
{
"accountId": 728006,
"accountNumber": "5BOMD9Q5WMCAYNM",
"iaId": 903751,
"branchId": 426176,
"areaId": 572198,
"regionId": 942753,
"modelOwnerId": 925453,
"programId": "805554",
"lastUpdated": "Jun 21, 2019 12:09:33 PM"
},
{
"accountId": 646757,
"accountNumber": "EQ5GFVQO3MYYQFX",
"iaId": 305899,
"branchId": 337585,
"areaId": 582687,
"regionId": 438242,
"modelOwnerId": 906315,
"programId": "805554",
"lastUpdated": "Jun 21, 2019 12:09:33 PM"
}
]

Config:

filter {
csv { autodetect_column_names => true }

if [message] =~ /^ENVIRONMENT_ID/ {
drop {}
}
aggregate {
task_id => "%{ENTITY_ID}"
code => '
map["keyInformation"] ||= {}
map["keyInformation"]["entityId"] = event.get("ENTITY_ID")
map["keyInformation"]["entityType"] = event.get("ENTITY_TYPE")
map["keyInformation"]["firmId"] = event.get("SPN_FIRM_ID")

		map["descriptive"] ||= {}
		map["descriptive"]["householdTitle"] = event.get("HOUSEHOLDTITLE")
		map["descriptive"]["tagType"] = event.get("TAG_TYPE")
		
	
		map['associatedAccounts'] ||= [] 
		map['associatedAccounts'] << { 
		 'accountNumber' => event.get("ACCOUNTNUMBER")
		 'accountId' => event.get("ACCT_ID")
		 'modelOwnerId' => event.get("MGR_ID")
		 'programId' =>  event.get("PROGRAM_ID")
		 'iaId' =>  event.get("IA_ID")
                     'branchId' =>  event.get("BRANCH_ID")
		 'areadId' =>  event.get("AREAD_ID")
		 'regionId' =>  event.get("REGION_ID")
		}			
        event.cancel
    '
    push_map_as_event_on_timeout => true
    timeout => 2
}

}

I am getting logstash config error, can you help me?

What is the error?

Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, {, } at line 28, column 9 (byte 783) after filter {\r\n csv { autodetect_column_names => true }\r\n \r\n if [message] =~ /^ENVIRONMENT_ID/ {\r\n\tdrop {}\r\n }\r\n aggregate {\r\n task_id => "%{ENTITY_ID}"\r\n code => '\r\n map["keyInformation"] ||= {}\r\n map["keyInformation"]["entityId"] = event.get("ENTITY_ID")\r\n map["keyInformation"]["entityType"] = event.get("ENTITY_TYPE")\r\n map["keyInformation"]["firmId"] = event.get("SPN_FIRM_ID")\r\n \r\n\t\t\tmap["descriptive"] ||= {}\r\n\t\t\tmap["descriptive"]["householdTitle"] = event.get("HOUSEHOLDTITLE")\r\n\t\t\tmap["descriptive"]["tagType"] = event.get("TAG_TYPE")\r\n\t\t\t\r\n\t\t\r\n\t\t\tmap['", :backtrace=>["C:/ELK/logstash-7.2.0/logstash-core/lib/logstash/compiler.rb:41:in compile_imperative'", "C:/ELK/logstash-7.2.0/logstash-core/lib/logstash/compiler.rb:49:incompile_graph'", "C:/ELK/logstash-7.2.0/logstash-core/lib/logstash/compiler.rb:11:in block in compile_sources'", "org/jruby/RubyArray.java:2577:inmap'", "C:/ELK/logstash-7.2.0/logstash-core/lib/logstash/compiler.rb:10:in compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:ininitialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in initialize'", "C:/ELK/logstash-7.2.0/logstash-core/lib/logstash/java_pipeline.rb:24:ininitialize'", "C:/ELK/logstash-7.2.0/logstash-core/lib/logstash/pipeline_action/create.rb:36:in execute'", "C:/ELK/logstash-7.2.0/logstash-core/lib/logstash/agent.rb:325:inblock in converge_state'"]}

The code block is surrounded by single quotes, so you should be using double quotes inside it (I do this so that I can use string magic when I need to).

Thanks.