How to update ES_index data from sql server using log-stash Automatically

Dear All.
once more time come up here with some little complex but not impossible issue.

i am trying to fetch only changed /updated data from sql server to Elastic search index using log stash tools but i am getting error below:-

    `<C:\Users\Ramesh.kumar\Downloads\Elasticsearch\logstash-7.1.1\bin>logstash -f Auto_test.conf

Sending Logstash logs to C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logs which is now configured via log4j2.properties
[2019-09-16T16:22:40,357][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-09-16T16:22:40,399][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.1.1"}
[2019-09-16T16:22:43,902][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, {, } at line 44, column 21 (byte 3029) after output {\r\n elasticsearch {\r\n hosts => ["wwww:9200"]\r\n index => "dbdataindex_join_allcolumns_qa_modify_cm_28_08_2019_test" \r\n\tdocument_type => "caremanagertest"\r\n\tdocument_id => "%{"", :backtrace=>["C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:41:in compile_imperative'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:49:in compile_graph'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:11:in block in compile_sources'", "org/jruby/RubyArray.java:2577:in map'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:10:in compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:in initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in initialize'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/java_pipeline.rb:23:in initialize'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/pipeline_action/create.rb:36:in execute'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/agent.rb:325:in block in converge_state'"]}
[2019-09-16T16:22:59,119][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2019-09-16T16:23:03,821][INFO ][logstash.runner ] Logstash shut down./>`

below is log stash file : Auto_test.conf

   ininput {jdbc {
jdbc_driver_library => "C:\Users\Ramesh.kumar\Downloads\Elasticsearch\sqljdbc_4.2\enu\jre8\sqljdbc42.jar"
#jdbc_driver_class => "com.microsoft.jdbc.sqlserver.SQLServerDriver"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "WWWWWWWWWWWWWWWWWWWWW"
jdbc_user => "wwwwwwwwww"
jdbc_password => "wwwwww"
jdbc_validate_connection => true
use_column_value => true
tracking_column => "patient_provider_hrk"
schedule => "*/2 * * * * *"
statement => "  Select DISTINCT  EDW.CONSENT.BENEFICIARY_DETAILS.HEALTHRECORDKEY as patient_provider_hrk,CAST(LOG_DATE AS DATE) AS LOG_DATE, CONVERT( VARCHAR, EDW.CONSENT.BENEFICIARY_DETAILS.dob, 101 ) as DOB1, 
CONCAT(CONCAT(PC1.LastName, ' '),PC1.FIRSTNAME) as  PCP, PC1.ProviderId as pid, pc.PROVIDER_TYPE, pc.ACTIVE,EDW.CONSENT.BENEFICIARY_DETAILS.MEDICARE,
CONCAT(CONCAT(EDW.CONSENT.BENEFICIARY_DETAILS.LASTNAME, ' '),EDW.CONSENT.BENEFICIARY_DETAILS.FIRSTNAME) as patientname 
, EDW.CONSENT.BENEFICIARY_DETAILS.SEX,
EDW.CONSENT.BENEFICIARY_DETAILS.DOB,EDW.CONSENT.BENEFICIARY_DETAILS.BENEFICIARY_STATUS,
edw.mobile.TEMP123.CARE_PROGRAM_NAME,
edw.mobile.TEMP123.CAREPROG_ID,EDW.CONSENT.BENEFICIARY_DETAILS.AGE,EDW.CONSENT.BENEFICIARY_DETAILS.CITY,
EDW.CONSENT.BENEFICIARY_DETAILS.phone_home,EDW.CONSENT.BENEFICIARY_DETAILS.EMAIL,
EDW.CONSENT.BENEFICIARY_DETAILS.MEDICARE,EDW.CONSENT.BENEFICIARY_DETAILS.REGIONNAME,
CONCAT(CONCAT(up.LAST_NAME, ' '),up.FIRST_NAME) as caremanager, up.user_id
from USER_TO_PATIENT 
left outer join EDW.CONSENT.BENEFICIARY_DETAILS on USER_TO_PATIENT.healthrecordkey =EDW.CONSENT.BENEFICIARY_DETAILS.healthrecordkey   
left outer join edw.mobile.TEMP123 on EDW.CONSENT.BENEFICIARY_DETAILS.healthrecordkey =  edw.mobile.TEMP123.healthrecordkey-- order by  1 desc
LEFT outer join EDW..PATIENT_PROVIDERS as pc on EDW.CONSENT.BENEFICIARY_DETAILS.healthrecordkey = pc.healthrecordkey 
LEFT JOIN (SELECT DISTINCT HEALTHRECORDKEY ,MAX(ISNULL(UPDATED_DATE, CREATED_DATE)) LOG_DATE
FROM CAREMANAGER..PATIENT_CONTACT_LOGS WITH (NOLOCK)
GROUP BY HEALTHRECORDKEY)PCL ON EDW.CONSENT.BENEFICIARY_DETAILS.HEALTHRECORDKEY = PCL.HEALTHRECORDKEY
LEFT outer join PROVIDERADMIN..CMSPROVIDER as PC1 on PC1.ProviderId=pc.CMSPROVIDER_ID
inner join PROVIDERADMIN.dbo.USERS AS up on  CAREMANAGER..USER_TO_PATIENT.user_id=up.USER_ID where CAREMANAGER..USER_TO_PATIENT.ACTIVE = 'Y' AND CAREMANAGER..USER_TO_PATIENT.RELATION in ('P', 'S')and BENEFICIARY_STATUS = 'active'
AND  ISNULL(PC1.STATUS,'Y') = 'Y' AND ISNULL(PC.PROVIDER_TYPE, 'P') = 'P' AND pc.ACTIVE = 'A'  order by patientname" 
}
}
output {
elasticsearch {
hosts => ["wwwwwwwww:9200"]
index => "dbdataindex_join_allcolumns_qa_modify_cm_28_08_2019_test" 
document_type => "caremanagertest"
document_id => "%{"patient_provider_hrk"}"

}
}

Note : in above query i am fetching data from multiple table into single index : (dbdataindex_join_allcolumns_qa_modify_cm_28_08_2019_test).

please help me ,i want to fetch only updated /changed data into Elasticsearch index.

Thanks
HadoopHelp

That should be

document_id => "%{patient_provider_hrk}"

Hi @Badger.

Thanks

but now i am facing issue with log-stash when i run other jdbc query that time it is tracking same fields and that other log stash query is not running why ?

below is error:

i[2019-09-19T13:48:32,668][INFO ][logstash.outputs.elasticsearch] Using default mapping template

[2019-09-19T13:48:32,687][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x85e4c2b run>"}
[2019-09-19T13:48:32,898][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2019-09-19T13:48:34,648][ERROR][logstash.javapipeline ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<TypeError: no implicit conversion of Integer into String>, :backtrace=>["uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date/format.rb:335:in `_parse'", "uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date.rb:734:in `parse'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/plugin_mixins/jdbc/value_tracking.rb:87:in `set_value'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/plugin_mixins/jdbc/value_tracking.rb:36:in `initialize'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/plugin_mixins/jdbc/value_tracking.rb:29:in `build_last_value_tracker'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/inputs/jdbc.rb:216:in `register'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/java_pipeline.rb:191:in `block in register_plugins'", "org/jruby/RubyArray.java:1792:in `each'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/java_pipeline.rb:190:in `register_plugins'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/java_pipeline.rb:280:in `start_inputs'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/java_pipeline.rb:244:in `start_workers'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/java_pipeline.rb:145:in `run'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/java_pipeline.rb:104:in `block in start'"], :thread=>"#<Thread:0x85e4c2b run>"}
[2019-09-19T13:48:34,671][ERROR][logstash.agent ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create, action_result: false", :backtrace=>nil}
[2019-09-19T13:48:35,041][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2019-09-19T13:48:39,948][INFO ][logstash.runner ] Logstash shut down.

this issue happning due to run above jdbc query with tracking column i think but i am not able to any other jdbc query using logstash to import other data.

Thanks
HadoopHelp

Hi @Badger.

getting duplicates values when i applied above.

how can we stop duplicates records .

Thanks
HadoopHelp

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.