I am using aggregate to merge similar data in to one in my csv file. I read the guide of aggregate filter and i wrote the config file according to my need.
Here's my csv file:
|state|city|haps|ads|
|---|---|---|---|
|tamil nadu|tirunelveli|hap0|ad1|
|tamil nadu|nagerkoil|hap0|ad1|
|tamil nadu|tuticorin|hap0|ad1|
|tamil nadu|madurai|hap0|ad1|
|tamil nadu|chennai|hap0|ad1|
|kerala|palakad|hap1|ad2|
|kerala|guruvayor|hap1|ad2|
|kerala|kolikodu|hap1|ad2|
|kerala|kottayam|hap1|ad2|
|kerala|idukki|hap1|ad2|
|mumbai|Akola|hap2|ad3|
|mumbai|Washim|hap2|ad3|
|mumbai|Jalna|hap2|ad3|
|mumbai|Nanded|hap2|ad3|
|mumbai|Latur|hap2|ad3|
In the csv file, I want to group the content regarding state. So in the output csv file, i should get only 3 data due to three states which is merged.
Here's my logstash config file for aggregate filter:
input {
file {
path => "/home/paulsteven/log_cars/aggreagate.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
quote_char => "%"
columns => ["state","city","haps","ads"]
}
aggregate {
task_id => "%{state}"
code => "
map['state'] = event.get('state')
map['cities'] ||=
map['cities'] << {'city' => event.get('city')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}
output {
file {
path => "/home/paulsteven/temp_out/temp.csv"
codec => line { format => "custom format: %{message}"}
}
}
in terminal, i am getting this info:
paulsteven@smackcoders:~$ sudo /usr/share/logstash/bin/logstash -f /home/paulsteven/log_cars/aggfilter.conf
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jruby.util.SecurityHelper (file:/usr/share/logstash/logstash-core/lib/jars/jruby-complete-9.2.6.0.jar) to field java.lang.reflect.Field.modifiers
WARNING: Please consider reporting this to the maintainers of org.jruby.util.SecurityHelper
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[WARN ] 2019-05-02 12:48:19.764 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2019-05-02 12:48:19.775 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.7.1"}
[INFO ] 2019-05-02 12:48:23.509 [Converge PipelineAction::Create<main>] pipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[INFO ] 2019-05-02 12:48:24.140 [Converge PipelineAction::Create<main>] pipeline - Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x21695f0b run>"}
[INFO ] 2019-05-02 12:48:24.227 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[INFO ] 2019-05-02 12:48:24.234 [[main]<file] observingtail - START, creating Discoverer, Watch with file and sincedb collections
[INFO ] 2019-05-02 12:48:24.554 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[INFO ] 2019-05-02 12:48:25.213 [[main]>worker0] file - Opening file {:path=>"/home/paulsteven/temp_out/temp.csv"}
in the output csv file i get:
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
custom format: %{message}
i want the ouput file like this:
state | cities |
---|---|
tamil nadu | [{'city': 'nagerkoil'}, {'city': 'madurai'}, {'city': 'tuticorin'}, {'city': 'tirunelveli'}, {'city': 'chennai'}] |
kerala | [{'city': 'idukki'}, {'city': 'kottayam'}, {'city': 'palakad'}, {'city': 'guruvayor'}, {'city': 'kolikodu'}] |
mumbai | [{'city': 'Jalna'}, {'city': 'Nanded'}, {'city': 'Washim'}, {'city': 'Latur'}, {'city': 'Akola'}] |
how to make it work. I tried the way recommended by logstash site. But it shows error. Why it's happening. How to make it possible...