Logstash aggregate filter skipping records

logstash aggregate filter skipping records. i have set the pipeline.worker to 1 but it's still missing nested records.
I follow the instruction at here but noting work. It is not skipping records all the time. but it's skipping records 2 time in 5 try. means if i run logstash 5 time then it work 3 time perfectly. but it fails two times . here is my filter.

> aggregate {
  	task_id => "%{db_id}"
  	code => "
  		map['db_id'] = event.get('db_id')
  		
  		map['box_posts'] ||= []
  		model = {
  			'box_id' => event.get('box_id'),
  			'score' => event.get('score'),
  			'post_box_id' => event.get('post_box_id'),
  			'user_id' => event.get('user_id'),
  			'text_content' => event.get('text_content'),
  			'created_at' => event.get('post_created'),
  			'id' => event.get('post_id'),
  			'post_type_id' => event.get('post_type_id'),
  			'short_code' => event.get('short_code'),
  			'created_location' => { 'lon' =>  event.get('lon') ,  'lat' => event.get('lat') },
  		}

  		model['post_media'] = { 

  			'id' =>  event.get('media_id'), 
  			'bg_color' =>  event.get('bg_color'), 
  			'file' =>  event.get('media_file'), 
  			'file_base_name' =>  event.get('file_base_name'), 
  			'file_type' =>  event.get('file_type'), 
  			'file_type_number' =>  event.get('file_type_number'), 
  			'file_width' =>  event.get('file_width'), 
  			'file_height' =>  event.get('file_height'), 
  			'is_primary' =>  event.get('is_primary') , 
  			'file_size' =>  event.get('file_size'), 
  			'medium_file_width' =>  event.get('medium_file_width'),
  			'medium_file_height' =>  event.get('medium_file_height'), 
  			'thumb_file_height' =>  event.get('thumb_file_height'), 
  			'thumb_file_width' =>  event.get('thumb_file_width'), 
  			'collage_file_height' =>  event.get('collage_file_height'), 
  			'collage_file_width' =>  event.get('collage_file_width'), 
  			'notification_file_width' =>  event.get('notification_file_width'), 
  			'notification_file_height' =>  event.get('notification_file_height'), 
  			'home_file_height' =>  event.get('home_file_height'), 
  			'home_file_width' =>  event.get('home_file_width'), 
  			'md5_filename' =>  event.get('md5_filename'), 
  			'bucket' =>  event.get('media_bucket'), 
  			'bg_color_collage' =>  event.get('bg_color_collage') 
  		}


  		if (event.get('location_id') != nil)
  			
  			model['location'] = { 
  				'fs_location_id' =>  event.get('fs_location_id') ,  
  				'location_name' =>  event.get('location_name') ,  
  				'map_image' =>  event.get('map_image') ,  #bool 
  				'address_name' =>  event.get('address_name') ,  
  				'bg_image' =>  event.get('location_bg_image') ,  
  				'thumbnail' =>  event.get('thumbnail') ,  
  				'rating' =>  event.get('location_rating') ,  # float
  				'total_reviews' =>  event.get('total_reviews') ,  
  				'file' =>  event.get('file') ,  
  				'bucket' =>  event.get('location_bucket') ,  
  				'width' =>  event.get('location_width') != '' ? event.get('location_width') : 0,  
  				'height' =>  event.get('location_height') != '' ? event.get('location_height'): 0,  
  				'phone' =>  event.get('location_phone') ,  
  				'other_info' =>  event.get('other_info') ,  
  				'website' =>  event.get('website')   

  			}
  			model['place'] = { 
  				'lon' =>  event.get('location_latitude') ,  
  				'lat' => event.get('location_longitude')
  			}
  		end
  		

  		if ( event.get('post_type_id') == 7 )

  		        source_type = event.get('search_post_source_type')

  			model['post_attributes'] = {
  				'title' => event.get('search_post_title') ,
  				'thumbnail' => event.get('search_post_thumbnail') ,
  				'bg_image' => event.get('search_post_bg_image') ,
  				'source_type' => event.get('search_post_source_type') ,
  				'source_id' => event.get('search_post_source_id') ,
  				'source_link' => event.get('search_post_source_link') ,
  				'item_type' => event.get('search_post_item_type') ,
  				'item_type_number' => event.get('search_post_item_type_number') ,
  				'rating' =>  event.get('search_post_rating'), 

  			}
  						
  			if( source_type !=  'google' && source_type !=  'web'  )

  				model['post_attributes'].merge! ( {
  					
  					
  					'source_title' => event.get('post_search_attribute_title'),
  					'owner' =>  event.get('post_search_attribute_owner'), 
  					'caption' =>  event.get('post_search_attribute_caption'), 
  					'description' =>  event.get('post_search_attribute_description'), 
  					'source_rating'  =>  event.get('post_search_attribute_rating'), 
  					'total_reviews' =>  event.get('post_search_attribute_total_reviews'), 
  					'total_likes' =>  event.get('post_search_attribute_total_likes'), 
  					'total_dislikes' =>  event.get('post_search_attribute_total_dislikes'), 
  					'total_pages' =>  event.get('post_search_attribute_total_pages'), 
  					'gallery' =>  event.get('post_search_attribute_gallery'), 
  					'stars' =>  event.get('post_search_attribute_stars'), 
  					'trailers' =>  event.get('post_search_attribute_trailers'), 
  					'external_url' =>  event.get('post_search_attribute_external_url')
  				})
  			end 
  			
  			if( source_type ==  'google' && event.get('gallery') !=  nil  ) 
  				model['post_attributes'].merge! ( { 
  					'trailers' =>  event.get('gallery') 
  				})
  			end 

  		end	
  		
  	map['box_posts'] <<  model
  	event.cancel()"
  	push_previous_map_as_event => true
  	timeout => 5

}

Does disabling java_execution help?

No, It's also not working. @Badger Please help?

I do not have any other suggestions.

ok. kindly let me know how to append at the end of existing nested array field via logstash.

I have an array field in ES. I want to append at the end of array field in ES using logstash . I am about to customize my problem .

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.