Logstash aggregate filter is inconsistent

I'm trying to group multiple cities into the same country using data from jdbc. I'm making use of the config push_previous_map_as_event, but result is inconsistent even though SQL is ordered correctly by country code.

This is my logstash conf file

input {
jdbc {
jdbc_driver_library => "C:\logstash\ifxjdbc.jar"
jdbc_driver_class => "com.informix.jdbc.IfxDriver"
jdbc_connection_string => "jdbc:informix-sqli://HOST:PORT/:INFORMIXSERVER=_soc;DBDATE=MDY4/;DB_LOCALE=en_US.57372"
jdbc_user => "user"
jdbc_password => "password"
schedule => "*/30 * * * * *"
statement => "select distinct co.co_ctry_cd, ct_city_nm from city ct
inner join country co on co.co_ctry_lk = ct.co_ctry_lk AND (co.CO_REC_EFFECT_DT <= today AND(co.CO_REC_EXPIRY_DT >= today OR co.CO_REC_EXPIRY_DT IS NULL))
where (ct.CT_REC_EFFECT_DT <= today AND(ct.CT_REC_EXPIRY_DT >= today OR ct.CT_REC_EXPIRY_DT IS NULL))
and ct.co_ctry_lk in(111,113) order by co.co_ctry_cd"
}
}
filter {
aggregate {
task_id => "%{co_ctry_cd}"
code => "
map['co_ctry_cd'] ||= event.get('co_ctry_cd')
map['cities'] ||= []
map['cities'] << {
'ct_city_nm' => event.get('ct_city_nm'),
}
event.cancel()
"
push_previous_map_as_event => true
timeout => 10
}
}
output {
stdout {
}
}

The aggregated result is inconsistent, first 3 iterations are good, starting from 4th multiple maps of the same country are pushed as different events instead of one. Please scroll to the bottom and refer to the bold section.

Below is the logstash output, anyone know what is the problem ?

C:\Development\logstash-7.5.1\bin>logstash -f C:\logstash\informix_2.conf
Thread.exclusive is deprecated, use Thread::Mutex
Sending Logstash logs to C:/Development/logstash-7.5.1/logs which is now configured via log4j2.properties
[2019-12-24T14:40:34,189][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-12-24T14:40:34,450][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.5.1"}
[2019-12-24T14:40:37,855][INFO ][org.reflections.Reflections] Reflections took 84 ms to scan 1 urls, producing 20 keys and 40 values
[2019-12-24T14:40:39,427][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.RubyArray) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team.
[2019-12-24T14:40:39,453][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["C:/logstash/informix_2.conf"], :thread=>"#<Thread:0x7220fc run>"}
[2019-12-24T14:40:39,882][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2019-12-24T14:40:40,019][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[2019-12-24T14:40:40,711][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
C:/Development/logstash-7.5.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/cronline.rb:77: warning: constant ::Fixnum is deprecated
[2019-12-24T14:41:02,670][INFO ][logstash.inputs.jdbc ][main] (0.183461s) select distinct co.co_ctry_cd, ct_city_nm from city ct
inner join country co on co.co_ctry_lk = ct.co_ctry_lk AND (co.CO_REC_EFFECT_DT <= today AND(co.CO_REC_EXPIRY_DT >= today OR co.CO_REC_EXPIRY_DT IS NULL))
where (ct.CT_REC_EFFECT_DT <= today AND(ct.CT_REC_EXPIRY_DT >= today OR ct.CT_REC_EXPIRY_DT IS NULL))
and ct.co_ctry_lk in(111,113) order by co.co_ctry_cd
C:/Development/logstash-7.5.1/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
INTERATION #1 - AW has 8 cities, BB has 12, which is the expected result
{
"co_ctry_cd" => "AW",
"cities" => [
[0] {
"ct_city_nm" => "ANGOCHI"
},
[1] {
"ct_city_nm" => "ORANJESTAD"
},
[2] {
"ct_city_nm" => "NOORD"
},
[3] {
"ct_city_nm" => "ARUBA"
},
[4] {
"ct_city_nm" => "PARADERA"
},
[5] {
"ct_city_nm" => "SAVANETA"
},
[6] {
"ct_city_nm" => "SANTA CRUZ"
},
[7] {
"ct_city_nm" => "SAN NICOLAS"
}
],
"@version" => "1",
"@timestamp" => 2019-12-24T06:41:02.959Z
}
{
"co_ctry_cd" => "BB",
"cities" => [
[ 0] {
"ct_city_nm" => "BRIDGETOWN"
},
[ 1] {
"ct_city_nm" => "ST GEORGE"
},
[ 2] {
"ct_city_nm" => "ST ANDREW"
},
[ 3] {
"ct_city_nm" => "CHRIST CHURCH"
},
[ 4] {
"ct_city_nm" => "ST LUCY"
},
[ 5] {
"ct_city_nm" => "ST JAMES"
},
[ 6] {
"ct_city_nm" => "ST JOSEPH"
},
[ 7] {
"ct_city_nm" => "ST JOHN"
},
[ 8] {
"ct_city_nm" => "ST THOMAS"
},
[ 9] {
"ct_city_nm" => "ST MICHAEL"
},
[10] {
"ct_city_nm" => "ST PHILIP"
},
[11] {
"ct_city_nm" => "ST PETER"
}
],
"@version" => "1",
"@timestamp" => 2019-12-24T06:41:14.918Z
}
[2019-12-24T14:41:30,856][INFO ][logstash.inputs.jdbc ][main] (0.221281s) select distinct co.co_ctry_cd, ct_city_nm from city ct
inner join country co on co.co_ctry_lk = ct.co_ctry_lk AND (co.CO_REC_EFFECT_DT <= today AND(co.CO_REC_EXPIRY_DT >= today OR co.CO_REC_EXPIRY_DT IS NULL))
where (ct.CT_REC_EFFECT_DT <= today AND(ct.CT_REC_EXPIRY_DT >= today OR ct.CT_REC_EXPIRY_DT IS NULL))
and ct.co_ctry_lk in(111,113) order by co.co_ctry_cd
INTERATION #2 - AW has 8 cities, BB has 12, which is the expected result
{
"co_ctry_cd" => "AW",
"cities" => [
[0] {
"ct_city_nm" => "ANGOCHI"
},
[1] {
"ct_city_nm" => "ARUBA"
},
[2] {
"ct_city_nm" => "ORANJESTAD"
},
[3] {
"ct_city_nm" => "NOORD"
},
[4] {
"ct_city_nm" => "PARADERA"
},
[5] {
"ct_city_nm" => "SAVANETA"
},
[6] {
"ct_city_nm" => "SANTA CRUZ"
},
[7] {
"ct_city_nm" => "SAN NICOLAS"
}
],
"@version" => "1",
"@timestamp" => 2019-12-24T06:41:30.983Z
}
{
"co_ctry_cd" => "BB",
"cities" => [
[ 0] {
"ct_city_nm" => "BRIDGETOWN"
},
[ 1] {
"ct_city_nm" => "ST GEORGE"
},
[ 2] {
"ct_city_nm" => "ST ANDREW"
},
[ 3] {
"ct_city_nm" => "CHRIST CHURCH"
},
[ 4] {
"ct_city_nm" => "ST JAMES"
},
[ 5] {
"ct_city_nm" => "ST LUCY"
},
[ 6] {
"ct_city_nm" => "ST JOSEPH"
},
[ 7] {
"ct_city_nm" => "ST JOHN"
},
[ 8] {
"ct_city_nm" => "ST MICHAEL"
},
[ 9] {
"ct_city_nm" => "ST THOMAS"
},
[10] {
"ct_city_nm" => "ST PHILIP"
},
[11] {
"ct_city_nm" => "ST PETER"
}
],
"@version" => "1",
"@timestamp" => 2019-12-24T06:41:49.918Z
}
[2019-12-24T14:42:00,823][INFO ][logstash.inputs.jdbc ][main] (0.137559s) select distinct co.co_ctry_cd, ct_city_nm from city ct
inner join country co on co.co_ctry_lk = ct.co_ctry_lk AND (co.CO_REC_EFFECT_DT <= today AND(co.CO_REC_EXPIRY_DT >= today OR co.CO_REC_EXPIRY_DT IS NULL))
where (ct.CT_REC_EFFECT_DT <= today AND(ct.CT_REC_EXPIRY_DT >= today OR ct.CT_REC_EXPIRY_DT IS NULL))
and ct.co_ctry_lk in(111,113) order by co.co_ctry_cd
INTERATION #3 - AW has 8 cities, BB has 12, which is the expected result
{
"co_ctry_cd" => "AW",
"cities" => [
[0] {
"ct_city_nm" => "ANGOCHI"
},
[1] {
"ct_city_nm" => "NOORD"
},
[2] {
"ct_city_nm" => "ORANJESTAD"
},
[3] {
"ct_city_nm" => "ARUBA"
},
[4] {
"ct_city_nm" => "PARADERA"
},
[5] {
"ct_city_nm" => "SANTA CRUZ"
},
[6] {
"ct_city_nm" => "SAVANETA"
},
[7] {
"ct_city_nm" => "SAN NICOLAS"
}
],
"@version" => "1",
"@timestamp" => 2019-12-24T06:42:00.937Z
}
{
"co_ctry_cd" => "BB",
"cities" => [
[ 0] {
"ct_city_nm" => "BRIDGETOWN"
},
[ 1] {
"ct_city_nm" => "ST ANDREW"
},
[ 2] {
"ct_city_nm" => "ST GEORGE"
},
[ 3] {
"ct_city_nm" => "CHRIST CHURCH"
},
[ 4] {
"ct_city_nm" => "ST JAMES"
},
[ 5] {
"ct_city_nm" => "ST JOSEPH"
},
[ 6] {
"ct_city_nm" => "ST LUCY"
},
[ 7] {
"ct_city_nm" => "ST JOHN"
},
[ 8] {
"ct_city_nm" => "ST MICHAEL"
},
[ 9] {
"ct_city_nm" => "ST PHILIP"
},
[10] {
"ct_city_nm" => "ST THOMAS"
},
[11] {
"ct_city_nm" => "ST PETER"
}
],
"@version" => "1",
"@timestamp" => 2019-12-24T06:42:19.933Z
}
[2019-12-24T14:42:31,068][INFO ][logstash.inputs.jdbc ][main] (0.359830s) select distinct co.co_ctry_cd, ct_city_nm from city ct
inner join country co on co.co_ctry_lk = ct.co_ctry_lk AND (co.CO_REC_EFFECT_DT <= today AND(co.CO_REC_EXPIRY_DT >= today OR co.CO_REC_EXPIRY_DT IS NULL))
where (ct.CT_REC_EFFECT_DT <= today AND(ct.CT_REC_EXPIRY_DT >= today OR ct.CT_REC_EXPIRY_DT IS NULL))
and ct.co_ctry_lk in(111,113) order by co.co_ctry_cd
`INTERATION #4 - Multiple country AW/BB events, results not aggregated correctly
{
"co_ctry_cd" => "AW",
"cities" => [
[0] {
"ct_city_nm" => "ARUBA"
},
[1] {
"ct_city_nm" => "ANGOCHI"
},
[2] {
"ct_city_nm" => "ORANJESTAD"
},
[3] {
"ct_city_nm" => "SAN NICOLAS"
},
[4] {
"ct_city_nm" => "NOORD"
},
[5] {
"ct_city_nm" => "PARADERA"
},
[6] {
"ct_city_nm" => "SAVANETA"
}
],
"@version" => "1",
"@timestamp" => 2019-12-24T06:42:31.183Z
}
{
"co_ctry_cd" => "BB",
"cities" => [
[0] {
"ct_city_nm" => "CHRIST CHURCH"
}
],
"@version" => "1",
"@timestamp" => 2019-12-24T06:42:31.184Z
}
{
"co_ctry_cd" => "AW",
"cities" => [
[0] {
"ct_city_nm" => "SANTA CRUZ"
}
],
"@version" => "1",
"@timestamp" => 2019-12-24T06:42:31.184Z
}
{
"co_ctry_cd" => "BB",
"cities" => [
[ 0] {
"ct_city_nm" => "BRIDGETOWN"
},
[ 1] {
"ct_city_nm" => "ST GEORGE"
},
[ 2] {
"ct_city_nm" => "ST JOHN"
},
[ 3] {
"ct_city_nm" => "ST ANDREW"
},
[ 4] {
"ct_city_nm" => "ST JAMES"
},
[ 5] {
"ct_city_nm" => "ST LUCY"
},
[ 6] {
"ct_city_nm" => "ST PETER"
},
[ 7] {
"ct_city_nm" => "ST JOSEPH"
},
[ 8] {
"ct_city_nm" => "ST MICHAEL"
},
[ 9] {
"ct_city_nm" => "ST THOMAS"
},
[10] {
"ct_city_nm" => "ST PHILIP"
}
],
"@version" => "1",
"@timestamp" => 2019-12-24T06:42:44.955Z
}......

If you have pipeline.workers set to 4, then you have 4 separate aggregates, each potentially aggregating a subset of the events. You have to set pipeline.workers to 1, and if your aggregation depends on event ordering (which I don't think yours does) then you must also disable the new java execution engine.

2 Likes

thanks, after changing pipeline.workers: 1, now it is aggregating correctly.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.