Logstash aggregate filter use with pagination

I have a logstash pipeline that

  • fetches data from mysql using jdbc input connecter
  • aggregates data for users based on user id
  • pushes aggregated data to elasticsearch cluster

It fetches large amount of data (e.g 2 million rows) from mysql server and uses cursor fetch with "jdbc_fetch_size" of 100000 so that it does not load all the rows at once (not page size, limit + offset but fetch_size) to avoid out of memory exception.

Below is my configuration:


input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://host/dbname?useCursorFetch=true"
    jdbc_fetch_size=>"100000"
    jdbc_user => ""
    jdbc_password => ""
    statement => "Select *
        from users_list
        left join user_posts on users_list.id = user_posts.user_id
        left join user_friends on users_list.id = user_friends.user_id
        order by users_list.id;"
  }
}

filter {
  aggregate {
       task_id => "%{id}" #id of user
       code => "
         map['user_id'] ||= event.get('id')
         map['name'] ||= event.get('name')
         map['email'] ||= event.get('email')
                  
         if (event.get('post_id') != nil)
                map['posts'] << {
                'id' => event.get('post_id'),
                'title' => event.get('post_title'),
                'description' => event.get('post_description')}
          
         end

         if (event.get('friend_id') != nil)
                map['friends'] << {
                'id' => event.get('friend_id'),
                'name' => event.get('friend_name')}
         end

         event.cancel()
       "
       push_previous_map_as_event => true
       timeout => 3
     }
}

output {
  elasticsearch {
    document_id => "%{id}"
    index => ""
    hosts => [""]
    user => ""
    password => ""
  }
}

Below is my elastic search index mapping:

{
    "mappings": {
        "properties": {
            "user_id": {
                "type": "long"
            },
            "posts": {
                "type": "nested",
                "properties": {
                    "id": {
                        "type": "long"
                    },
                    "title": {
                        "type": "text"
                    },
                    "description": {
                        "type": "text"
                    }
                }
            },
            "friends": {
                "type": "nested",
                "properties": {
                    "id": {
                        "type": "long"
                    },
                    "name": {
                        "type": "text"
                    }
                }
            },
            "email": {
                "type": "text"
            },
            "name": {
                "type": "text"
            }
        }
    }
}

Everything seems to be fine but i have a concern over the aggregation, my question is:

  • How does logstash handles the pagination with aggregation, i am not sure over the case, when logstash fetches first batch of 100000 rows from mysql, there is a possibility that the data (rows) of that user can get divided somehow where first batch fetches a first half (related rows for that user) and the next fetch , gets the other half (for that user) later, on second round trip, isn't there a possibility that while the second round trip, the aggregated map (upto that point of time) for that user gets emptied by timeout and the previously aggregated data gets flushed by logstash. I know all should be good if we dont use pagination bcs then, there wont be any division of data horizontally, but for this amount of data we must use pagination
  • if thats the case, how can we make sure that no data gets lost/overwritten by second batch while we are performing the aggregation using pagination ?
  • another question is, does logstash empties the first batch of events before it goes for the second round trip for the next batch or does it keep them in memory too?

You will just have to make the aggregate timeout high enough that all the data for a user is processed before the event is flushed. Obviously increasing the timeout increases the amount of data in the map and therefore your memory footprint.

thanks for the reply Badger, isnt there any other way than just increasing the timeout, since timeout does not always guarantee the consistency, and if there isnt, then what should you think the timeout be in seconds, taking into account this amount of data .
also would be really helpful if you elaborate a bit about how fetch size works and how does "push_previous_map_as_event" behaves in this case, any response would be appreciated, thanks

Never having used a jdbc input I cannot say anything about how it works.

Thanks @Badger, can you mention/connect anyone who can help escaping the "timeout" solution for this problem and/or also anyone who can explain "behind the scenes" for "jdbc_fetch_size" to understand it better?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.