Logstash not sending all data to elasticsearch

I have a simple jdbc connector without filter to fetch data from database to feed the elasticsearch with.

input {
   jdbc {
       jdbc_connection_string => "#connectionstring"
       jdbc_user => "#username"
       jdbc_password => "#password"
       jdbc_driver_library => "./logstash-core/lib/jars/mssql-jdbc-11.2.3.jre18.jar"
       jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
       clean_run => true
       record_last_run => false
       jdbc_paging_enabled => true
       jdbc_page_size => 1000
       jdbc_fetch_size => 1000
       statement => "
       SELECT
       *,
       'PERSON' + CAST(PERSON.Id AS VARCHAR(20)) AS uniqueid
       FROM PERSON"
   }
   jdbc {
       jdbc_connection_string => "#connectionstring"
       jdbc_user => "#username"
       jdbc_password => "#password"
       jdbc_driver_library => "./logstash-core/lib/jars/mssql-jdbc-11.2.3.jre18.jar"
       jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
       clean_run => true
       record_last_run => false
       jdbc_paging_enabled => true
       jdbc_page_size => 1000
       jdbc_fetch_size => 1000
       statement => "
       SELECT
       *,
       'INVOICE' + CAST(INVOICE.Id AS VARCHAR(20)) AS uniqueid
       FROM INVOICE"
   }
}
output {
   elasticsearch {
       hosts => ["#elasticsearchurl"]
       index => "#elasticindex"
       action => "index"
       document_id => "%{uniqueid}"
   }
}

I get various result from this. Sometimes elastic index get all rows from logstash. Sometimes only 4000 rows. sometimes 27000 out of 28000 rows. sometimes it filters more than rows than in input but on those elastic receives all rows.

With this call:
curl -XGET 'localhost:9600/_node/stats/events?pretty'

I get these results,(all rows are received in elasticsearch)

"events" : { 
  "in" : 28071, 
  "filtered" : 53140, 
  "out" : 53140, 
  "duration_in_millis" : 27416, 
  "queue_push_duration_in_millis" : 1233 
}

Only 4000 rows received in elasticsearch
Retry by clearing everything:

"events" : { 
 "in" : 28071, 
 "filtered" : 4000, 
 "out" : 4000, 
 "duration_in_millis" : 11047, 
 "queue_push_duration_in_millis" : 3763 
}

Following this guide:

This line of code checks in/out discrepancy.

curl -s localhost:9600/_node/stats | jq '.pipelines.main.plugins.filters[] | select(.events.in!=.events.out)'

But this returns nothing when run.
Any idea what could be the reason for this behaviour?

when you run that sql does it returns 4000 record or 8000?

"select *,'person'+cast(persion.id as varchar(20)) as uniqueid" from both table?

do you get 4000 record in index or 8000?

your document_id = uniqueid and it might be same and overwriting record?

Hi Sachin,
When I run the sql on both tables it returns 28071 rows.

one is

'INVOICE' + CAST(INVOICE.Id AS VARCHAR(20)) AS uniqueid

the other is

'PERSON' + CAST(PERSON.Id AS VARCHAR(20)) AS uniqueid

so it shouldnt overwrite.

I have rerun it a couple of times like this:

Remove index in elasticsearch
Start logstash
Wait until it finished in the logs. [2023-03-31T10:21:56,427][INFO ][logstash.javapipeline ][index] Pipeline terminated {"pipeline.id"=>"index"}. No Errors or Warning in the logstash logs.
Check kibana that there are 28000 docucments.
Check elasticsearch logs. No errors or warning there either.

Twice the index has all 28071 documents.
Four times the index had 4000 documents. Even after waiting 4h.
Four times the index had 27713 docucments.

What i can see from localhost:9600/_node/stats/events?pretty is that logstash is receiving 28071 rows correctly. But sometimes filters 53140 rows and sometimes 4000 rows. I'm a bit confused how that could happen.

I have many database pulling data and my benchmark is
if DatabaseA has X record I should have X record in index.

it seems like you have to jdbc running on a logstash, i.e each is pulling 28071 record. and then somehow you are using filter to combine them? or are you just putting them in elastic as is?

that logic is still not clear to me. if you are doing ingestion as is then you should have 28071+28071 record in elastic

Yeah, that what I thought too. I don't have any filters(my first post). Only input from two tables person and invoice which should generate a total of 28071 records.

I'm running logstash 8.7.0.

Could there be anything in configuration that can mess with the data?
Are there any other troubleshooting methods?

Change the jdbc_fetch_size and jdbc_page_size to 1001 in one input and 1002 in the other. See if the 4000 number changes.

Also, if you only have one of the two inputs in the configuration does it ever stop after a few thousand events?

Also, enable --log.level debug. The statement handler will log a message as it processes each page.

I did a little experimenting. I had this in my pipeline.yml file:

- pipeline.id: test
  queue.type: persisted
  pipeline.batch.size: 1000
  path.config: "config/test.conf"

while in test.conf i had:

	jdbc_paging_enabled => true
	jdbc_page_size => 1000
	jdbc_fetch_size => 1000

By removing "pipeline.batch.size: 1000" in the pipeline.yml file the first event where I got more out than in was gone. Now I only get less out:

"in" : 28071,
 "filtered" : 14108,
 "out" : 14108,

"in" : 28071,
"filtered" : 17875,
"out" : 17875,

"in" : 28071,
"filtered" : 18608,
"out" : 18608,

I couldn't find any logical pattern by doing 1001 or 1002.

I think I found out why its happening. While changing the query(including adding fields). The new fields didn't reach elasticsearch. Restarting logstash didn't fix it either. I think logstash was stashing the old messages somehow and resends it to elasticsearch.

I had queue.type: persisted to help against message losses:

When I changed back to the default setting queue.type: memory Everything worked as expected. Elasticsearch got the new fields and all rows was indexed.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.