Field in MSSQL as ID and the use of versioning

Hello, I hope I've put this in the correct place.

My scenario is:
I have data accessible via the JDBC input on MSSQL.
The JDBC input plugin runs a command to find records that match a certain query.
The output is an Elastic Cloud index.

The records that come in via JDBC come with an ID number and the results contain a large amount of duplicate data as the query regularly matches records that have been updated. Duplicate data being records that have the same ID number.

The ID numbers are not incremental when they are returned.

I'd like to use versioning so that when an ID number that has already been indexed arrives it is treated as a new version. Any ID numbers that arrive that have not been indexed will be indexed as a new document.

I believe I can use Upsert for this. I can't find how to designate the ID number returned from the MSSQL database via JDBC as the ID.

Thanks in advance for any help provided.

If you have a unique id on each record then if you set the document_id any new document with the same id will overwrite the document already in elasticsearch. Use a sprintf reference to set the option to the value of a field.

Thanks for the response, @Badger
That is the behaviour I was expecting too but it isn't working for me. I've pasted my Logstash pipeline code for reference.

input {
  jdbc {

    jdbc_connection_string => "jdbc:sqlserver://EUdbro.haloitsm.com:7006;databaseName=synergy;encrypt=true;trustServerCertificate=true;"
    jdbc_user => "xxxxxxxxxxxx"
    jdbc_password => "xxxxxxxxxxxx"
    jdbc_driver_library => "/home/ubuntu/Documents/sqljdbc_12.2/enu/mssql-jdbc-12.2.0.jre8.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    statement => "
select
faultid as [Ticket ID],
symptom as [Subject],
uname as [Agent],
aareadesc as [Customer],
sdesc as [Site],
uusername as [User],
rtdesc as [Ticket Type],
tstatusdesc as [Status],
category2 as [Category],
dateoccured  as [Date Opened],
case when slaresponsestate='I' then 'Inside'
when slaresponsestate='O' then 'Outside'
when slaresponsestate is null then 'Awaiting Response' end as [Response State],
frespondbydate as [Response Date],
datecleared as [Closed Date]
from faults 
join tstatus on tstatus=status
join requesttype on rtid=requesttypenew 
join area on aarea=areaint
join site on ssitenum=sitenumber
join users on uid=userid
join uname on unum=assignedtoint
WHERE tstatusdesc='Closed'
ORDER BY 'ticket id' ASC 
"
#    use_column_value => true
#    tracking_column => "ticket id"
#    tracking_column_type => numeric
#    last_run_metadata_path => "/testdata/test_manual.yml"
#    record_last_run => true
    schedule => "*/5 * * * *"
#   clean_run => false
  }
}

output {
  elasticsearch {
    hosts => ["https://xxxxxxxxxxxxxx.us-central1.gcp.cloud.es.io:443"]
    user => "xxxxxxx"
    password => "xxxxxxxxxxxxx"    
    index => "shalo-closed-tickets-%{+YYYY.MM}"
    action => "update"
    document_id => "%{ticket id}"
    doc_as_upsert => true
   pipeline => "shalo_tickets"
  }

Try document_id => "%{Ticket ID}"

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.