Testing aggregates => LogStash::ConfigurationError: Aggregate plugin: more than one filter which defines timeout options. But only defining once

I'm trying to test my logstash configurations that use the aggregate plugin. I have not had any issues running them locally against the binary as I write them. But now I want to test the behavior expectations of the aggregations. Then I found a blog post and docker container that utilize rspec to test the filters. However I was trying to write a test to check the value of eventId and this ConfigurationError pops up. I'm surprised because it seems to me I'm only defining the timeout option in one aggregate block. But perhaps I'm doing something terribly wrong.

filter.conf

 fillter{
  dissect {
    mapping => {
      "message" => "%{emTimestamp}|%{}%{}| [%{eventId}] %{messageType}: %{msg}"
    }
  }

  if [messageType] == "GET Request" {
    aggregate {
      task_id => "%{eventId}"
      code => "
        map['eventId'] ||= event.get('eventId')
        map['emTimestamp'] ||= event.get('emTimestamp')
        map['actionType'] ||= 'aggregation'
        map['request'] = {'clientSource' => event.get('clientSource'), 'method' => event.get('method'), 'urlPath'=> event.get('urlPath')}"
    }
  }

  if [messageType] == "Response" {
    mutate { gsub => [ "msg","[\\]",""] }
    kv {
      source => "msg"
      value_split => "="
      field_split => "&?&"
      trim_key => "\s"
      trim_value => "\r"
      remove_field => ["msg", "message"]
    }

  json {
    source => ['body']
    target => "parsed"
  }

  ruby {
    code => "mapped = event.get('parsed').map {
      |h|
        {
          'obTimestamp' => h['item']['assets']['timestamp'],
          'fieldId'     => h['item']['assets']['reference']['fieldId'],
          'eventId'     => h['item']['assets']['reference']['eventId'],
          'userId'     => h['item']['assets']['reference']['userId'],
          'startTs'     => h['item']['attributes']['startTs'],
          'endTs'     => h['item']['attributes']['endTs']
        }
    }; event.set('mapped',mapped)"
  }

  aggregate {
    task_id => "%{eventId}"
    code => "
      map['eventId'] ||= event.get('eventId')
      map['emTimestamp'] ||= event.get('emTimestamp')
      map['actionType'] ||= 'aggregration'
      map['response'] = {'body' => event.get('mapped')}"
    }
  }

  if [messageType] == "ACK Response" {
    kv {
      source => "msg"
      value_split => "="
      field_split => "&,&"
      trim_key => "\s"
      trim_value => "\r"
      remove_field => ["msg"]
    }

    aggregate {
      task_id => "{eventId}"
      code => "
        map['eventId'] ||= event.get('eventId')
        map['emTimestamp'] ||= event.get('emTimestamp')
        map['actionType'] ||= 'aggregration'
      "
      push_map_as_event_on_timeout => true
      timeout_task_id_field => "eventId"
      timeout => 60
    }
  }
}

test_spec.rb

require "logstash/devutils/rspec/spec_helper"

# Load the configuration file
@@configuration = String.new
@@configuration << File.read("conf/filter.conf")

describe "Test ACK response type" do

  config(@@configuration)

  # Inject input event/message into the pipeline
  message = File.read("/opt/logstash/logs/tc.log")
  sample("message" => message, "type" => "") do
    # Check the ouput event/message properties
    insist { subject.get("transactionId") } == 104
    reject { subject.get("tags").include?("_grokparsefailure") }
    reject { subject.get("tags").include?("_dateparsefailure") }
  end
end

error message and test failure

Failures:

  1) Test ACK response type "{"message":"10/23/2021 02:23:23 PM|DEBUG|HttpUtils ..." when processed
     Failure/Error: insist { subject.get("eventId") } == "104"

     LogStash::ConfigurationError:
       Aggregate plugin: For task_id pattern '%{eventId}', there are more than one filter which defines timeout options. All timeout options have to be defined in only one aggregate filter per task_id pattern. Timeout options are : timeout, inactivity_timeout, timeout_code, push_map_as_event_on_timeout, push_previous_map_as_event, timeout_timestamp_field, timeout_task_id_field, timeout_tags
     # ./vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.1/lib/logstash/filters/aggregate.rb:103:in `block in register'
     # ./vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.9.1/lib/logstash/filters/aggregate.rb:97:in `register'
     # org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:75:in `register'
     # ./logstash-core/lib/logstash/pipeline.rb:288:in `register_plugin'
     # ./logstash-core/lib/logstash/pipeline.rb:299:in `block in register_plugins'
     # ./logstash-core/lib/logstash/pipeline.rb:299:in `register_plugins'
     # ./logstash-core/lib/logstash/pipeline.rb:660:in `maybe_setup_out_plugins'
     # ./logstash-core/lib/logstash/pipeline.rb:549:in `filter'
     # ./vendor/bundle/jruby/2.5.0/gems/logstash-devutils-1.3.6-java/lib/logstash/devutils/rspec/logstash_helpers.rb:54:in `block in results'
     # ./vendor/bundle/jruby/2.5.0/gems/logstash-devutils-1.3.6-java/lib/logstash/devutils/rspec/logstash_helpers.rb:52:in `block in results'
     # ./vendor/bundle/jruby/2.5.0/gems/logstash-devutils-1.3.6-java/lib/logstash/devutils/rspec/logstash_helpers.rb:68:in `block in subject'
     # ./rspec-tests/test_spec.rb:15:in `block in <main>'
     # ./vendor/bundle/jruby/2.5.0/gems/insist-1.0.0/lib/insist.rb:47:in `value'
     # ./vendor/bundle/jruby/2.5.0/gems/insist-1.0.0/lib/insist/comparators.rb:13:in `=='
     # ./rspec-tests/test_spec.rb:15:in `block in <main>'
     # ./vendor/bundle/jruby/2.5.0/gems/rspec-wait-0.0.9/lib/rspec/wait.rb:46:in `block in <main>'
     # ./lib/bootstrap/rspec.rb:31:in `<main>'

Finished in 0.76796 seconds (files took 1.54 seconds to load)
1 example, 1 failure

Failed examples:

rspec ./rspec-tests/test_spec.rb:13 # Test ACK response type "{"message":"10/23/2021 02:23:23 PM|DEBUG|HttpUtils ..." when processed

tc.log

10/23/2021 02:23:23 PM|DEBUG|HttpUtils                              | [104] Response: body =[{},"assets":{},{}],},"part":0}]

To reproduce launch this docker container and follow instructions to volume mount the filter, spec, and tc.log so the text executes. GitHub - iteratec/logstash-rspec: Docker image to run rspecs testing logstash filter configurations.

docker command, populate paths.

    docker run --rm --name logstash-rspecs \
        -v /<path to filter dir>:/opt/logstash/filters-under-test \
        -v /<path to spec dir>:/opt/logstash/rspec-tests  \
        -v /<path to log dir>:/opt/logstash/logs \
        iteratec/logstash-rspec:7.3.0

Hi,
I'm also in the progress of testing aggregate filters and... well... the rspec for 7.x.x is broken in that it double inits which causes this issue. I managed to update the rspec to 8.x.x in a dirty way but... it works enough to fix the error (or be patient and wait for 8.x.x to be released)

# Update logstash devutils in a dirty way... is officially supported from logstash 8.0
# A hack a day keeps my unemployment at bay
/opt/logstash/bin/ruby -S gem update logstash-devutils --ignore-dependencies
rm -rf /opt/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-devutils-1.3.6-java/
mv /opt/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-devutils-2.2.1-java/ /opt/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-devutils-1.3.6-java/

*I'm not a ruby developer at all... so... if the hack could be made less hacky...then be my guest and point out the better way to do things :slight_smile:

Anyway, you're not quite out of the woods yet...I think... at least, I'm currently investigating how to write a test which include aggregate timeouts and thus far my efforts are ... well... not going great (but because I'm not a ruby developer it does not speed up my research)

1 Like

Thanks for elucidating the issue in logstash-devutils! I have had some success with logstash-filter-verifier but it's clunky. I have been having trouble writing aggregates using the s3 output plugin. Are you going to write to s3?

You are welcome!
Meanwhile I did find a way to make the timeout issue "go away" or at least, work around it, by using the timeout_timestamp_field optien in aggregate filter "and set it to @timestamp"; this way, the timeout will be calculated based on previous messages.

I'm, however, unsure what this will do regarding to actual "timeouts", in the sense that, what if there's never a new message, perhaps the latest aggregated data would not be send... it's something that i'll need to verify in the "real world" but for my tests it something I could work with. It does imply that a dummy message is sent in order for the time to progress but please, if you know a better way, please let me know!

My favorite solution would be to simply mock out part of the aggregate filter... but so I haven't reached anything in that regard at all but I'm no ruby programmer at all :frowning:

2 Likes

I ditched my attempt to get logstash-rspec working. Not sure I will head back to it, since I performed the testing using the filter-verifier. But you have made it further than me. I've also resolved my S3 issues. But if you end up publishing some example tests with the aggregate working with logstash-rspec I'd like to see it.

Hi, the logstash-filter-verifier project looks promising!

I see there's a posibility to mock filters, I assume this is what you used? Do you have any example of how this works? Did you use timeouts?

So far I managed to refactor my original tests to this. I did notice there's no junit xml output at this moment in time, do you use this test suite in CI environments?

Sorry for the many questions but it could be helpful for me and potentially others

There's an example and some discussion here:

I opened a few other issues

I ended up setting a timeout of 1 second and testing very small unit like aggregates. I found that I needed to describe all the events that passed through the pipeline and not only the aggregates. However I probably could have spent more time if I had it to find a better solution.

I would encourage you to extend the example or find more elegant ways of testing and perhaps contribute as an example to the filter verifier examples, or at least share somewhere.

Also providing feedback regarding the testing on the open issues or creating another one might help make a better testing framework for the aggregation events.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.