Data not getting synced properly from SQL to elastic

I am syncing data from sql to elastic and in filter in the code block I am appending to map eg features but what is happening is that when I am running the bulk sql query ,not all features are getting appended to each index. eg if index 1 has 10 features then 2 or 3 are getting ingested to elastic but when I am directly specifying the id in = in sql query then all 10 features are getting synced. But the bulk query if I am running separating then also it gives the correct count.

What could be the potential issue? Tried multple things but my map is not getting updated to full features for each hotelIds.

filter {
    aggregate {
      task_id => "%{HotelCode}"
      code => "
        map['HotelCode'] ||= event.get('HotelCode')

        map['Features']||= []  

        map['Features'] << {

            'FeatureId' => event.get('FeatureId'),
            'FeatureIdName' => event.get('FeatureIdName'),
            'Group' => event.get('GroupName'),
            'Rank' => event.get('Rank')

      push_previous_map_as_event => true
      timeout => 5

Could this be because of your timeout => 5?

timeout edit

  • Value type is number
  • Default value is 1800

The amount of seconds (since the first event) after which a task is considered as expired.

When timeout occurs for a task, its aggregate map is evicted.

If push_map_as_event_on_timeout or push_previous_map_as_event is set to true, the task aggregation map is pushed as a new Logstash event.

Timeout can be defined for each "task_id" pattern.

If your SQL query is returning a large number of results, it will only hold open each record in memory for aggregation for 5 seconds with timeout set to 5.

Specifying a specific ID would result in fewer results that are less likely to hit the timeout.

Are you running this pipeline with pipeline.workers set to 1 as well?

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.