Ingestion not consistent, data loss

We're ingesting a big amount of data from an oracle database into elastic through logstash.
We import some oracle table information into different arrays in elastic. Each table corresponding to an array, and each row corresponding to an element of the array.
Sometimes it imports lets say 5 rows from the table information and shows a 5 element array.
Other times it imports those same 5 rows but just shows 2 or 3 array elements in elastic.
This it's a random behavior and it happens with arrays holding different types of information.

Using the stdout plugin I can see that the indeed 5 elements of the array reach the output stage on logstash.
What can be happening so that then they don't all appear on elastic sometimes.

Could it be that they are not sent to elastic at all, or elastic it's not able to index it as it's a big load of information being processed.
I don't see any issue reported on logstash logs.
Do you have an idea on what can be the issue or even if not
how can I check what is reaching elastic and what can I tune on the output plugin or on logstash/elastic config files?
Any hint would be much appreciated as we're running out of options.


Please share your logstash configuration and a sample of your message.

Depending on how the data looks like and the mapping of the issues, this could lead to some documents not being indexed, but if Elasticsearch has some problem indexing a document you would get a log line in Logstash log.

You have nothing in Logstash logs?

Can you share an example of how those different types looks like?

Are you using a jdbc filter? That will return each row as a separate event. If you are combining those events into an event that has an array of rows then you will need to run with --pipeline.workers 1, otherwise you could have the rows spread across multiple worker threads.

Thanks Leandro, I've shared the config file and ruby file.

Thanks Badger. We've pipeline.workers: 5. I've thought about something of that sort, and I've tested with just 1 worker, it get's slow as expected but the main issue is that I was having read timeout on the input plugin with that. Do you have any suggestion to avoid that timeout with just 1 worker?
Here's a sample of a config file that we're using

input {
  jdbc {
    lowercase_column_names => true
    jdbc_driver_library => ""
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
	jdbc_connection_string => "jdbc:oracle:thin:@${DB_HOST_SYSPER}:${DB_PORT_SYSPER}/${DB_SERVICE_SYSPER}"
	jdbc_user => "${DB_USER_SYSPER}"
	jdbc_password => "${DB_PASS_SYSPER}"
    tracking_column => "per_id"
    statement =>   "..."

filter {
    aggregate {
        task_id => "%{per_id}"
        code => "
        map['per_id'] = event.get('per_id')
        map['jd_knowledges'] ||= []
        map['jd_knowledges'] << event.to_hash # JSON.parse(event.to_json) # a small hack to get event data
        push_previous_map_as_event => true
        timeout => 30
	fingerprint {
	  key => "HR_SEARCH_KEY"
	  source => "per_id"
	  base64encode => true
	  target => "[@metadata][fingerprint]"
	  method => "SHA1"

    elasticsearch {
        user => "${ELASTIC_USER}"
        password => "${ELASTIC_PASSWORD}"
        ca_file => "${LOGSTASH_CERTS}/cacert.pem"
        index => "${INDEX_NAME}"
        query => '_id:"person:%{[@metadata][fingerprint]}"'
        fields => { "[car][current_assignment][job_description]" => "job_description" }

    ruby {
        path => "${LOGSTASH_DATA_PATH}/task5.rb"
	mutate {
        remove_field => ["per_id", "jd_knowledges", "job_description"]

    sleep {
        time => "1"   # Sleep 1 second
        every => 250   # on every 250th event

output {
    elasticsearch {
        user => "${ELASTIC_USER}"
        password => "${ELASTIC_PASSWORD}"
        ssl => true
        cacert => "${LOGSTASH_CERTS}/cacert.pem"
        index => "${INDEX_NAME}"
        document_id => "person:%{[@metadata][fingerprint]}" #"person:%{perId}"
        doc_as_upsert => true
        action => "update"
        http_compression => true

And the ruby file used

def filter(event)
	knowledges_data = event.get('jd_knowledges') ? event.get('jd_knowledges') : []

  if !(knowledges_data.nil? || knowledges_data.empty?)

		job_description = event.get('job_description') ? event.get('job_description') : {}

		knowledges_hash = {}
		knowledges_data.each do |function_entry|
			key = function_entry['jik_id'].to_s
			knowledges_hash[key] ||= {}
			%w( jdl_id jdl_descr jdl_obsolete jdl_id_reports_to jdl_descr_reports_to level_id jdl_id_parent_lev_2 jdl_id_parent_descr_lev_2 jdl_id_parent_lev_1 jdl_id_parent_descr_lev_1 total_cnt partial_cnt ).each do |field_name|
			  value = function_entry[field_name]
			  knowledges_hash[key][:"#{field_name}"] ||= value if value 
		jd_knowledges = []
		knowledges_hash.each do |jik_id, params_obj|
			if jik_id
			  params_obj[:jik_id] = jik_id.to_i
		job_description['knowledges'] = jd_knowledges
		event.set('[car][current_assignment][job_description]', job_description)

As @Badger mentioned, you need to use pipeline.workers set to 1 if you are using the aggregate filter, it is required for the aggregate filter to work and yes, one of the drawbacks is that it will make processing slow as you are using only one core.

Can you share a log of this timeout? The number of workers should have some impact on the filter and output block only, but not in the input block.

Thanks @leandrojmp and @Badger for your great help. Setting the nbr of workers as 1 solved the issue. There was no timeout in the subsequent tests.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.