Aggregate filter not working in csv output plugin

I am using aggregate to merge similar data in to one in my csv file. I read the guide of aggregate filter and i wrote the config file according to my need.
Here's my csv file:
|state|city|haps|ads|
|---|---|---|---|
|tamil nadu|tirunelveli|hap0|ad1|
|tamil nadu|nagerkoil|hap0|ad1|
|tamil nadu|tuticorin|hap0|ad1|
|tamil nadu|madurai|hap0|ad1|
|tamil nadu|chennai|hap0|ad1|
|kerala|palakad|hap1|ad2|
|kerala|guruvayor|hap1|ad2|
|kerala|kolikodu|hap1|ad2|
|kerala|kottayam|hap1|ad2|
|kerala|idukki|hap1|ad2|
|mumbai|Akola|hap2|ad3|
|mumbai|Washim|hap2|ad3|
|mumbai|Jalna|hap2|ad3|
|mumbai|Nanded|hap2|ad3|
|mumbai|Latur|hap2|ad3|

In the csv file, I want to group the content regarding state. So in the output csv file, i should get only 3 data due to three states which is merged.

Here's my logstash config file for aggregate filter:
input {
file {
path => "/home/paulsteven/log_cars/aggreagate.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
quote_char => "%"
columns => ["state","city","haps","ads"]
}
aggregate {
task_id => "%{state}"
code => "
map['state'] = event.get('state')
map['cities'] ||=
map['cities'] << {'city' => event.get('city')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3

    }
}
output {
 file {
   path => "/home/paulsteven/temp_out/temp.csv"
   codec => line { format => "custom format: %{message}"}
 }
}

in terminal, i am getting this info:

paulsteven@smackcoders:~$ sudo /usr/share/logstash/bin/logstash -f /home/paulsteven/log_cars/aggfilter.conf 
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jruby.util.SecurityHelper (file:/usr/share/logstash/logstash-core/lib/jars/jruby-complete-9.2.6.0.jar) to field java.lang.reflect.Field.modifiers
WARNING: Please consider reporting this to the maintainers of org.jruby.util.SecurityHelper
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[WARN ] 2019-05-02 12:48:19.764 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2019-05-02 12:48:19.775 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.7.1"}
[INFO ] 2019-05-02 12:48:23.509 [Converge PipelineAction::Create<main>] pipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[INFO ] 2019-05-02 12:48:24.140 [Converge PipelineAction::Create<main>] pipeline - Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x21695f0b run>"}
[INFO ] 2019-05-02 12:48:24.227 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[INFO ] 2019-05-02 12:48:24.234 [[main]<file] observingtail - START, creating Discoverer, Watch with file and sincedb collections
[INFO ] 2019-05-02 12:48:24.554 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[INFO ] 2019-05-02 12:48:25.213 [[main]>worker0] file - Opening file {:path=>"/home/paulsteven/temp_out/temp.csv"}

in the output csv file i get:

custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}

i want the ouput file like this:

state cities
tamil nadu [{'city': 'nagerkoil'}, {'city': 'madurai'}, {'city': 'tuticorin'}, {'city': 'tirunelveli'}, {'city': 'chennai'}]
kerala [{'city': 'idukki'}, {'city': 'kottayam'}, {'city': 'palakad'}, {'city': 'guruvayor'}, {'city': 'kolikodu'}]
mumbai [{'city': 'Jalna'}, {'city': 'Nanded'}, {'city': 'Washim'}, {'city': 'Latur'}, {'city': 'Akola'}]

how to make it work. I tried the way recommended by logstash site. But it shows error. Why it's happening. How to make it possible...

There is nothing wrong with your aggregate filter. It works just fine.

You first problem, if your CSV file really is pipe delimited, is your csv filter, which should be

    csv {
        separator => "|"
        columns => ["e1", "state","city","haps","ads", "e2"]
    }
    if [state] in [ "---", "state" ] { drop {} }

If you really want an output file that is partly, but not entirely json, then you will need to install the json_encode plugin and use that

    json_encode { source => "cities" target => "cities" }

You can then use a plain codec. You need a literal newline in the format.

output { stdout { codec => plain { format => "%{state} %{cities}
" } } }
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.