Logstash filter aggregate with 2 nested fields

Goodnight. Could you help me with the problem below?
I have the mysql table below:

productId     productName     categoryId     categoryName     locationId     locationPrice
------------------------------------------------------------------------------------------
1             Product1        1              Category1        1              100
1             Product1        1              Category1        2              200
2             Product2        1              Category1        1              100
2             Product2        1              Category1        2              200
2             Product2        2              Category2        1              100
2             Product2        2              Category2        2              200
2             Product2        3              Category3        3              300

and I have the following logstash code:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => ""
    jdbc_user => ""
    jdbc_password => ""
    jdbc_paging_enabled => true
    jdbc_paging_mode => "explicit"
    jdbc_page_size => 1000
    tracking_column => "updatedat"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/1 * * * *"
    statement => "SELECT
        a.productId,
		    a.productName,
        b.categoryId,
        b.categoryName,
        c.locationId,
        c.locationPrice
FROM 	     Product       as a
INNER JOIN Category      as b  on a.categoryId = b.id
INNER JOIN Location      as c  on a.locationId = c.id"
  }
}
filter {
  aggregate {
       task_id => "%{productId}"
       code => "
         map['productId'] ||= event.get('productId')
         map['productName'] ||= event.get('productName')
         
         map['categories'] ||= []

        unless map['categories'].any? { |cat| cat['categoryId'] == event.get('categoryId') }
          map['categories'] << {
              'categoryId' => event.get('categoryId'),
              'categoryName' => event.get('categoryName')
          }
        end

        map['locations'] ||= []

        #unless map['locations'].any? { |loc| loc['locationId'] == event.get('locationId') }
            map['locations'] << {
                'locationId' => event.get('locationId'),
                'locationPrice' => event.get('locationPrice')
            }
        #end
         
         event.cancel()
       "
       
       push_previous_map_as_event => true
       timeout => 3
  }
}
output {
  stdout { codec =>  "rubydebug"}
}

and I would like to obtain the following result:

[
  {
    "productId": 1,
    "productName": "Product1",
    "categories": [
      {
        "categoryId": 1,
        "categoryName": "Category1"
      }
    ],
    "locations": [
      {
        "locationId": 1,
        "locationPrice": 100
      },
      {
        "locationId": 2,
        "locationPrice": 200
      }
    ]
  },
  {
    "productId": 2,
    "productName": "Product2",
    "categories": [
      {
        "categoryId": 1,
        "categoryName": "Category1"
      },
      {
        "categoryId": 2,
        "categoryName": "Category2"
      },
      {
        "categoryId": 3,
        "categoryName": "Category3"
      }
    ],
    "locations": [
      {
        "locationId": 1,
        "locationPrice": 100
      },
      {
        "locationId": 2,
        "locationPrice": 200
      },
      {
        "locationId": 3,
        "locationPrice": 300
      }
    ]
  }
]

but I'm getting the following result:

[
  {
    "productId": 1,
    "productName": "Product1",
    "categories": [
      {
        "categoryId": 1,
        "categoryName": "Category1"
      }
    ],
    "locations": [
      {
        "locationId": 1,
        "locationPrice": 100
      }
    ]
  },
  {
    "productId": 2,
    "productName": "Product2",
    "categories": [
      {
        "categoryId": 1,
        "categoryName": "Category1"
      },
      {
        "categoryId": 2,
        "categoryName": "Category2"
      },
      {
        "categoryId": 3,
        "categoryName": "Category3"
      }
    ],
    "locations": [
      {
        "locationId": 1,
        "locationPrice": 100
      },
      {
        "locationId": 2,
        "locationPrice": 200
      },
      {
        "locationId": 3,
        "locationPrice": 300
      }
    ]
  }
]

Note that for product 1, the number of locations is limited to the number of categories.
Does anyone know what the problem is and how to solve it?Preformatted text

If you use that jdbc input, remove the aggregate filter, and add an output with a json codec then what do the events look like? I'm not going to build a database instance to reproduce your issue, but can read json from a file to do so.

unfortunately I couldn't reproduce it with this example I showed.
In fact, my data is much larger, however, it is the same structure as this example. I just wanted to make it easier to understand.
It seems to me that the processing is working in parallel for the same task_id, and thus overwriting the previous processing. That's why sometimes the final record has a record in the categories array, after I run it again it has another, for example.
I'm new to logstash and don't understand it very well. Could this be happening? Is there any way to guarantee that we will only have one execution for each task_id until the end of processing?

Are you setting pipeline.workers to 1, as the documentation says you must? Otherwise different threads may process different subsets of the data on different runs.

1 Like

The problem was resolved after including the pipeline.workers to 1 configuration.
Thank yo very much.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.