My scenario is:
I have data accessible via the JDBC input on MSSQL.
The JDBC input plugin runs a command to find records that match a certain query.
The output is an Elastic Cloud index.
The records that come in via JDBC come with an ID number and the results contain a large amount of duplicate data as the query regularly matches records that have been updated. Duplicate data being records that have the same ID number.
The ID numbers are not incremental when they are returned.
I'd like to use versioning so that when an ID number that has already been indexed arrives it is treated as a new version. Any ID numbers that arrive that have not been indexed will be indexed as a new document.
I believe I can use Upsert for this. I can't find how to designate the ID number returned from the MSSQL database via JDBC as the ID.
If you have a unique id on each record then if you set the document_id any new document with the same id will overwrite the document already in elasticsearch. Use a sprintf reference to set the option to the value of a field.
Thanks for the response, @Badger
That is the behaviour I was expecting too but it isn't working for me. I've pasted my Logstash pipeline code for reference.
input {
jdbc {
jdbc_connection_string => "jdbc:sqlserver://EUdbro.haloitsm.com:7006;databaseName=synergy;encrypt=true;trustServerCertificate=true;"
jdbc_user => "xxxxxxxxxxxx"
jdbc_password => "xxxxxxxxxxxx"
jdbc_driver_library => "/home/ubuntu/Documents/sqljdbc_12.2/enu/mssql-jdbc-12.2.0.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
statement => "
select
faultid as [Ticket ID],
symptom as [Subject],
uname as [Agent],
aareadesc as [Customer],
sdesc as [Site],
uusername as [User],
rtdesc as [Ticket Type],
tstatusdesc as [Status],
category2 as [Category],
dateoccured as [Date Opened],
case when slaresponsestate='I' then 'Inside'
when slaresponsestate='O' then 'Outside'
when slaresponsestate is null then 'Awaiting Response' end as [Response State],
frespondbydate as [Response Date],
datecleared as [Closed Date]
from faults
join tstatus on tstatus=status
join requesttype on rtid=requesttypenew
join area on aarea=areaint
join site on ssitenum=sitenumber
join users on uid=userid
join uname on unum=assignedtoint
WHERE tstatusdesc='Closed'
ORDER BY 'ticket id' ASC
"
# use_column_value => true
# tracking_column => "ticket id"
# tracking_column_type => numeric
# last_run_metadata_path => "/testdata/test_manual.yml"
# record_last_run => true
schedule => "*/5 * * * *"
# clean_run => false
}
}
output {
elasticsearch {
hosts => ["https://xxxxxxxxxxxxxx.us-central1.gcp.cloud.es.io:443"]
user => "xxxxxxx"
password => "xxxxxxxxxxxxx"
index => "shalo-closed-tickets-%{+YYYY.MM}"
action => "update"
document_id => "%{ticket id}"
doc_as_upsert => true
pipeline => "shalo_tickets"
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.