I'm trying to test my logstash configurations that use the aggregate plugin. I have not had any issues running them locally against the binary as I write them. But now I want to test the behavior expectations of the aggregations. Then I found a blog post and docker container that utilize rspec to test the filters. However I was trying to write a test to check the value of eventId and this ConfigurationError pops up. I'm surprised because it seems to me I'm only defining the timeout option in one aggregate block. But perhaps I'm doing something terribly wrong.
filter.conf
fillter{
dissect {
mapping => {
"message" => "%{emTimestamp}|%{}%{}| [%{eventId}] %{messageType}: %{msg}"
}
}
if [messageType] == "GET Request" {
aggregate {
task_id => "%{eventId}"
code => "
map['eventId'] ||= event.get('eventId')
map['emTimestamp'] ||= event.get('emTimestamp')
map['actionType'] ||= 'aggregation'
map['request'] = {'clientSource' => event.get('clientSource'), 'method' => event.get('method'), 'urlPath'=> event.get('urlPath')}"
}
}
if [messageType] == "Response" {
mutate { gsub => [ "msg","[\\]",""] }
kv {
source => "msg"
value_split => "="
field_split => "&?&"
trim_key => "\s"
trim_value => "\r"
remove_field => ["msg", "message"]
}
json {
source => ['body']
target => "parsed"
}
ruby {
code => "mapped = event.get('parsed').map {
|h|
{
'obTimestamp' => h['item']['assets']['timestamp'],
'fieldId' => h['item']['assets']['reference']['fieldId'],
'eventId' => h['item']['assets']['reference']['eventId'],
'userId' => h['item']['assets']['reference']['userId'],
'startTs' => h['item']['attributes']['startTs'],
'endTs' => h['item']['attributes']['endTs']
}
}; event.set('mapped',mapped)"
}
aggregate {
task_id => "%{eventId}"
code => "
map['eventId'] ||= event.get('eventId')
map['emTimestamp'] ||= event.get('emTimestamp')
map['actionType'] ||= 'aggregration'
map['response'] = {'body' => event.get('mapped')}"
}
}
if [messageType] == "ACK Response" {
kv {
source => "msg"
value_split => "="
field_split => "&,&"
trim_key => "\s"
trim_value => "\r"
remove_field => ["msg"]
}
aggregate {
task_id => "{eventId}"
code => "
map['eventId'] ||= event.get('eventId')
map['emTimestamp'] ||= event.get('emTimestamp')
map['actionType'] ||= 'aggregration'
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "eventId"
timeout => 60
}
}
}
test_spec.rb
require "logstash/devutils/rspec/spec_helper"
# Load the configuration file
@@configuration = String.new
@@configuration << File.read("conf/filter.conf")
describe "Test ACK response type" do
config(@@configuration)
# Inject input event/message into the pipeline
message = File.read("/opt/logstash/logs/tc.log")
sample("message" => message, "type" => "") do
# Check the ouput event/message properties
insist { subject.get("transactionId") } == 104
reject { subject.get("tags").include?("_grokparsefailure") }
reject { subject.get("tags").include?("_dateparsefailure") }
end
end
error message and test failure
Failures:
1) Test ACK response type "{"message":"10/23/2021 02:23:23 PM|DEBUG|HttpUtils ..." when processed
Failure/Error: insist { subject.get("eventId") } == "104"
LogStash::ConfigurationError:
Aggregate plugin: For task_id pattern '%{eventId}', there are more than one filter which defines timeout options. All timeout options have to be defined in only one aggregate filter per task_id pattern. Timeout options are : timeout, inactivity_timeout, timeout_code, push_map_as_event_on_timeout, push_previous_map_as_event, timeout_timestamp_field, timeout_task_id_field, timeout_tags
# ./vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.1/lib/logstash/filters/aggregate.rb:103:in `block in register'
# ./vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.1/lib/logstash/filters/aggregate.rb:97:in `register'
# org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:75:in `register'
# ./logstash-core/lib/logstash/pipeline.rb:288:in `register_plugin'
# ./logstash-core/lib/logstash/pipeline.rb:299:in `block in register_plugins'
# ./logstash-core/lib/logstash/pipeline.rb:299:in `register_plugins'
# ./logstash-core/lib/logstash/pipeline.rb:660:in `maybe_setup_out_plugins'
# ./logstash-core/lib/logstash/pipeline.rb:549:in `filter'
# ./vendor/bundle/jruby/2.5.0/gems/logstash-devutils-1.3.6-java/lib/logstash/devutils/rspec/logstash_helpers.rb:54:in `block in results'
# ./vendor/bundle/jruby/2.5.0/gems/logstash-devutils-1.3.6-java/lib/logstash/devutils/rspec/logstash_helpers.rb:52:in `block in results'
# ./vendor/bundle/jruby/2.5.0/gems/logstash-devutils-1.3.6-java/lib/logstash/devutils/rspec/logstash_helpers.rb:68:in `block in subject'
# ./rspec-tests/test_spec.rb:15:in `block in <main>'
# ./vendor/bundle/jruby/2.5.0/gems/insist-1.0.0/lib/insist.rb:47:in `value'
# ./vendor/bundle/jruby/2.5.0/gems/insist-1.0.0/lib/insist/comparators.rb:13:in `=='
# ./rspec-tests/test_spec.rb:15:in `block in <main>'
# ./vendor/bundle/jruby/2.5.0/gems/rspec-wait-0.0.9/lib/rspec/wait.rb:46:in `block in <main>'
# ./lib/bootstrap/rspec.rb:31:in `<main>'
Finished in 0.76796 seconds (files took 1.54 seconds to load)
1 example, 1 failure
Failed examples:
rspec ./rspec-tests/test_spec.rb:13 # Test ACK response type "{"message":"10/23/2021 02:23:23 PM|DEBUG|HttpUtils ..." when processed
tc.log
10/23/2021 02:23:23 PM|DEBUG|HttpUtils | [104] Response: body =[{},"assets":{},{}],},"part":0}]
To reproduce launch this docker container and follow instructions to volume mount the filter, spec, and tc.log so the text executes. GitHub - iteratec/logstash-rspec: Docker image to run rspecs testing logstash filter configurations.
docker command, populate paths.
docker run --rm --name logstash-rspecs \
-v /<path to filter dir>:/opt/logstash/filters-under-test \
-v /<path to spec dir>:/opt/logstash/rspec-tests \
-v /<path to log dir>:/opt/logstash/logs \
iteratec/logstash-rspec:7.3.0