How to configure logstash to get only new data/updated data into elasticsearch

Dear All.

i am facing the issue from a long time.i have unique column in sql server side and i want to get only new data/updated data into elasticsearch every 15 min.

how will i do ...after loading first time data into elasticsearch index or before loading data into elasticseach.

i tried with after load the data into elastic then i get same copy of data two times.

inputs :-

tracking_column => "row_num"
use_column_value => true
schedule => "*/15 * * * * *"
statement :----------------------------------------------------------------------------------------------------
statement => "SELECT *, ROW_NUMBER() OVER(ORDER BY HEALTHRECORDKEY) row_num from(
SELECT DISTINCT S.HEALTHRECORDKEY,NULL PRODUCTCODE,S.FIRSTNAME,S.LASTNAME,NULL DRUGDESCRIPTION,
PUL.LABTEST_TYPE_DESC,PUL.LABTEST_DATE, PUL.DIAG_SERVICE_LOCATION,PUL.LABTEST_RESULT_DATE
FROM EDW.CONSENT.BENEFICIARY_DETAILS S
INNER JOIN CDR.PHR_USER_HOSPITALIZATION PU ON S.HEALTHRECORDKEY = PU.HEALTHRECORDKEY
INNER JOIN CDR.ORDER_INFO OI ON PU.HOSPITALIZATION_ID = OI.HOSPITALIZATION_ID
INNER JOIN CDR.PHR_USER_LABTEST PUL ON OI.ORDER_ID = PUL.ORDER_ID

	UNION

	SELECT DISTINCT S.HEALTHRECORDKEY,PRODUCTCODE,S.FIRSTNAME,S.LASTNAME,DRUGDESCRIPTION,NULL,NULL,NULL,NULL 
	FROM EDW.CONSENT.BENEFICIARY_DETAILS S 
	INNER JOIN EDW.CDR.ERX_PATIENT_MEDICATION PM ON S.HEALTHRECORDKEY = PM.HEALTHRECORDKEY)a"

outputs:-

output {
elasticsearch {
hosts => ["ip:9200"]
index => "row_num"
document_type => "labs"
document_id => "%{row_num}"

here row_num is unique record and i am trying to trace this column here.

Need : please try to help me ...how to get newly/updated data into elastic (after loading or before loading we need to trace column value)

Thanks
HadoopHelp

I have same setup.
I do not get duplicate of data because I use document_id as uniq field

can you show us output of duplicate data. how does it look like?

it should basically remove old record and add new due to uniq document_id

Hi @elasticforme .

thanks for being here .you are running logstash after loading first time data or before into elastic search index?

Thanks
HadoopHelp

I have this running on pipeline.

basically you trace column row_num. that means in theory it should only scan new rwo_num

but if you want to test it just do this

select * from xyz where row_num >= 100 and row_num <= 110;

i.e selecting only 10 record and run it again. you index should still see 10 record.

Hi @elasticforme.

thanks but it is not updating the previous loaded data from elastic index with sql db data.

note :- only getting new data like:
in logstash.lastrun file store - 1001 then it is fetching from 1001 to up but

not updating already data from elastic index.

Any idea please .

Thanks
HadoopHelp

Dear @elasticforme and all .

Now its working for me [getting new data as well as updated data from sql server] .

Thanks
HadoopHelp

Hi @elasticforme and all.

now my logstash job is failing after some time and showing below error:

Why this is happening please help me urgently need/help.

RAM: 16GB
HD=500GB

Note: Elastic search is up and running !!!

below is my logstash configuration file:-

input {
jdbc {
jdbc_driver_library => "C:\Users\Ramesh.kumar\Downloads\Elasticsearch\sqljdbc_4.2\enu\jre8\sqljdbc42.jar"

jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "-------------------------------------;"


jdbc_user => "Ra--------mar"
jdbc_password => "999999919"
jdbc_validate_connection => true
tracking_column => "id"
use_column_value => true
schedule => "*/20 * * * * *"


statement => "SELECT *, ROW_NUMBER() OVER(ORDER BY HEALTHRECORDKEY) id from(
	SELECT DISTINCT S.HEALTHRECORDKEY,NULL PRODUCTCODE,S.FIRSTNAME,S.LASTNAME,NULL DRUGDESCRIPTION,
	PUL.LABTEST_TYPE_DESC,PUL.LABTEST_DATE, PUL.DIAG_SERVICE_LOCATION,PUL.LABTEST_RESULT_DATE 
	FROM EDW.CONSENT.BENEFICIARY_DETAILS S 
	INNER JOIN CDR.PHR_USER_HOSPITALIZATION PU  ON S.HEALTHRECORDKEY = PU.HEALTHRECORDKEY 
	INNER JOIN  CDR.ORDER_INFO OI  ON PU.HOSPITALIZATION_ID = OI.HOSPITALIZATION_ID 
	INNER JOIN CDR.PHR_USER_LABTEST PUL ON OI.ORDER_ID = PUL.ORDER_ID 

	UNION

	SELECT DISTINCT S.HEALTHRECORDKEY,PRODUCTCODE,S.FIRSTNAME,S.LASTNAME,DRUGDESCRIPTION,NULL,NULL,NULL,NULL 
	FROM EDW.CONSENT.BENEFICIARY_DETAILS S 
	INNER JOIN EDW.CDR.ERX_PATIENT_MEDICATION PM ON S.HEALTHRECORDKEY = PM.HEALTHRECORDKEY)a" 
	}
}

output {
elasticsearch {
hosts => ["-,-,-,-:9200"]
index => "final_row"
document_type => "labs"
document_id => "%{id}"

}
}

Thanks
HadoopHelp

it says your elasticsearch services are down

how did you check if your elasticsearch service is up and running?

Hi @elasticforme.

why i am saying because i am able to access elastic index from kibana and other apps .

and also we know :http://localhost:9200/_cat/indices

above command display all indices from Elasticsearch .

now its working but this issue come some time ?

Thanks
HadoopHelp

Hi,

Can you show me your setup? I am fairly new to Logstash and I am trying to configure Logstash to get new data. The example of HadoopHelp does not work for me unfortunately, so I would really appreciate another example.
Thanks in advance!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.