logstash aggregate filter skipping records. i have set the pipeline.worker to 1 but it's still missing nested records.
I follow the instruction at here but noting work. It is not skipping records all the time. but it's skipping records 2 time in 5 try. means if i run logstash 5 time then it work 3 time perfectly. but it fails two times . here is my filter.
> aggregate {
task_id => "%{db_id}" code => " map['db_id'] = event.get('db_id') map['box_posts'] ||= [] model = { 'box_id' => event.get('box_id'), 'score' => event.get('score'), 'post_box_id' => event.get('post_box_id'), 'user_id' => event.get('user_id'), 'text_content' => event.get('text_content'), 'created_at' => event.get('post_created'), 'id' => event.get('post_id'), 'post_type_id' => event.get('post_type_id'), 'short_code' => event.get('short_code'), 'created_location' => { 'lon' => event.get('lon') , 'lat' => event.get('lat') }, } model['post_media'] = { 'id' => event.get('media_id'), 'bg_color' => event.get('bg_color'), 'file' => event.get('media_file'), 'file_base_name' => event.get('file_base_name'), 'file_type' => event.get('file_type'), 'file_type_number' => event.get('file_type_number'), 'file_width' => event.get('file_width'), 'file_height' => event.get('file_height'), 'is_primary' => event.get('is_primary') , 'file_size' => event.get('file_size'), 'medium_file_width' => event.get('medium_file_width'), 'medium_file_height' => event.get('medium_file_height'), 'thumb_file_height' => event.get('thumb_file_height'), 'thumb_file_width' => event.get('thumb_file_width'), 'collage_file_height' => event.get('collage_file_height'), 'collage_file_width' => event.get('collage_file_width'), 'notification_file_width' => event.get('notification_file_width'), 'notification_file_height' => event.get('notification_file_height'), 'home_file_height' => event.get('home_file_height'), 'home_file_width' => event.get('home_file_width'), 'md5_filename' => event.get('md5_filename'), 'bucket' => event.get('media_bucket'), 'bg_color_collage' => event.get('bg_color_collage') } if (event.get('location_id') != nil) model['location'] = { 'fs_location_id' => event.get('fs_location_id') , 'location_name' => event.get('location_name') , 'map_image' => event.get('map_image') , #bool 'address_name' => event.get('address_name') , 'bg_image' => event.get('location_bg_image') , 'thumbnail' => event.get('thumbnail') , 'rating' => event.get('location_rating') , # float 'total_reviews' => event.get('total_reviews') , 'file' => event.get('file') , 'bucket' => event.get('location_bucket') , 'width' => event.get('location_width') != '' ? event.get('location_width') : 0, 'height' => event.get('location_height') != '' ? event.get('location_height'): 0, 'phone' => event.get('location_phone') , 'other_info' => event.get('other_info') , 'website' => event.get('website') } model['place'] = { 'lon' => event.get('location_latitude') , 'lat' => event.get('location_longitude') } end if ( event.get('post_type_id') == 7 ) source_type = event.get('search_post_source_type') model['post_attributes'] = { 'title' => event.get('search_post_title') , 'thumbnail' => event.get('search_post_thumbnail') , 'bg_image' => event.get('search_post_bg_image') , 'source_type' => event.get('search_post_source_type') , 'source_id' => event.get('search_post_source_id') , 'source_link' => event.get('search_post_source_link') , 'item_type' => event.get('search_post_item_type') , 'item_type_number' => event.get('search_post_item_type_number') , 'rating' => event.get('search_post_rating'), } if( source_type != 'google' && source_type != 'web' ) model['post_attributes'].merge! ( { 'source_title' => event.get('post_search_attribute_title'), 'owner' => event.get('post_search_attribute_owner'), 'caption' => event.get('post_search_attribute_caption'), 'description' => event.get('post_search_attribute_description'), 'source_rating' => event.get('post_search_attribute_rating'), 'total_reviews' => event.get('post_search_attribute_total_reviews'), 'total_likes' => event.get('post_search_attribute_total_likes'), 'total_dislikes' => event.get('post_search_attribute_total_dislikes'), 'total_pages' => event.get('post_search_attribute_total_pages'), 'gallery' => event.get('post_search_attribute_gallery'), 'stars' => event.get('post_search_attribute_stars'), 'trailers' => event.get('post_search_attribute_trailers'), 'external_url' => event.get('post_search_attribute_external_url') }) end if( source_type == 'google' && event.get('gallery') != nil ) model['post_attributes'].merge! ( { 'trailers' => event.get('gallery') }) end end map['box_posts'] << model event.cancel()" push_previous_map_as_event => true timeout => 5
}