Aggregate filter plugin - problem how to construct the filter

Hello!

I'm trying to aggregate some data from a MS SQL database by using Logstash.
Below is a view of the database.

The jbdc field and my aggregation filter look like this:

jdbc { 
             ....
            statement => 
            "
                select 
                d.Id,
                u.FirstName
                from Document d
                full join Approvers a
                on d.Id = a.DocumentId
                full join [User] u
                on a.UserId = u.Id
          "
   }
   filter {
       aggregate {
           task_id => "%{d.Id}"
           code => "
               map['document_id'] || = event.get('d.Id')
               map['approvers'] ||= []  
               map['approvers'] << {'first_name' => event.get('u.FirstName')}                                      
               event.cancel()
           "
          push_previous_map_as_event => true
          timeout => 3
       }
   }

If I execute the statment inside the jbdc field above in the database i would get this data:

| Id | FirstName |
| 1  |  Nils     |
| 2  | Rudolf    |
| 2  | Olle      |
| 3  | NULL      |

And the way I would like to aggregate the data look something like this:

[
    {
        "document_id":  "1",
        "approvers": [
            { "first_name": "Nils"}
         ]
    },
    {
        "document_id":  "2",
        "approvers": [
            { "first_name": "Rudolf"},
            { "first_name": "Olle"}
         ]
    },
    {
        "document_id":  "3",
        "approvers": []
    },
]

Any clue on how to fix it? Am I missing something?

Thank you!

Why do you have that space inside the {} ?

@Badger Just a typo for this example, fixed now.
Still got the same error.

Hi

What do you get if you run your config? Can you show us the actual output you are getting from stdout{}?

Are you getting any error messsages in your logs? Or is it just that the output you get is not what you need?

Have you made sure your pipeline has only one worker? This is a requirement for the aggregate{} filter.

Hope this helps.

Hi

I get no errors when running the config and the config runs with -w 1.
It feels like the filter isn't doing anything at all. Does Logstash recognize fields with a dot like Document.Id / d.Id?
One thing I'm really unsure about is the task_id, what id should I use when using joins?

The result is below:

1 hit
{
  "_index": "randomindex",
  "_type": "_doc",
  "_id": "OFpE7G8BDQZMTmIc_gTh",
  "_score": 1,
  "_source": {
    "@version": "1",
    "first_name": "Nils",
    "document_id": 1,
    "@timestamp": "2020-01-28T13:09:02.005Z"
  },
  "fields": {
    "@timestamp": [
      "2020-01-28T13:09:02.005Z"
    ]
  }
}
2 hit
{
  "_index": "randomindex",
  "_type": "_doc",
  "_id": "OVpE7G8BDQZMTmIc_gTh",
  "_score": 1,
  "_source": {
    "@version": "1",
    "first_name": "Olle",
    "document_id": 2,
    "@timestamp": "2020-01-28T13:09:01.994Z"
  },
  "fields": {
    "@timestamp": [
      "2020-01-28T13:09:01.994Z"
    ]
  }
}
3 hit
{
  "_index": "randomindex",
  "_type": "_doc",
  "_id": "N1pE7G8BDQZMTmIc_gTh",
  "_score": 1,
  "_source": {
    "@version": "1",
    "first_name": null,
    "document_id": 3,
    "@timestamp": "2020-01-28T13:09:02.006Z"
  },
  "fields": {
    "@timestamp": [
      "2020-01-28T13:09:02.006Z"
    ]
  }
}
4 hit
{
  "_index": "randomindex",
  "_type": "_doc",
  "_id": "OlpE7G8BDQZMTmIc_wRi",
  "_version": 1,
  "_score": 0,
  "_source": {
    "@version": "1",
    "first_name": "Rudolf",
    "document_id": 2,
    "@timestamp": "2020-01-28T13:09:02.005Z"
  },
  "fields": {
    "@timestamp": [
      "2020-01-28T13:09:02.005Z"
    ]
  }
}

Hi

I think you should use document_id as task_id in your aggregate{} filter (task_id => "%{document_id}").

Try commenting out the code section for now, or leaving it empty (code => "").

Your timeout is only 3 seconds. Try increasing it.

Since you do not have beginning or end events, you should probably use timeout_task_id_field => "task_id" and set a value to inactivity_timeout.

Hope this helps

In addition to '-w 1' you will need to disable pipeline.java_execution.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.