Dear All.
once more time come up here with some little complex but not impossible issue.
i am trying to fetch only changed /updated data from sql server to Elastic search index using log stash tools but i am getting error below:-
`<C:\Users\Ramesh.kumar\Downloads\Elasticsearch\logstash-7.1.1\bin>logstash -f Auto_test.conf
Sending Logstash logs to C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logs which is now configured via log4j2.properties
[2019-09-16T16:22:40,357][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-09-16T16:22:40,399][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.1.1"}
[2019-09-16T16:22:43,902][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, {, } at line 44, column 21 (byte 3029) after output {\r\n elasticsearch {\r\n hosts => ["wwww:9200"]\r\n index => "dbdataindex_join_allcolumns_qa_modify_cm_28_08_2019_test" \r\n\tdocument_type => "caremanagertest"\r\n\tdocument_id => "%{"", :backtrace=>["C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:41:in compile_imperative'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:49:in
compile_graph'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:11:in block in compile_sources'", "org/jruby/RubyArray.java:2577:in
map'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:10:in compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:in
initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in initialize'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/java_pipeline.rb:23:in
initialize'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/pipeline_action/create.rb:36:in execute'", "C:/Users/Ramesh.kumar/Downloads/Elasticsearch/logstash-7.1.1/logstash-core/lib/logstash/agent.rb:325:in
block in converge_state'"]}
[2019-09-16T16:22:59,119][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2019-09-16T16:23:03,821][INFO ][logstash.runner ] Logstash shut down./>`
below is log stash file : Auto_test.conf
ininput {jdbc {
jdbc_driver_library => "C:\Users\Ramesh.kumar\Downloads\Elasticsearch\sqljdbc_4.2\enu\jre8\sqljdbc42.jar"
#jdbc_driver_class => "com.microsoft.jdbc.sqlserver.SQLServerDriver"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "WWWWWWWWWWWWWWWWWWWWW"
jdbc_user => "wwwwwwwwww"
jdbc_password => "wwwwww"
jdbc_validate_connection => true
use_column_value => true
tracking_column => "patient_provider_hrk"
schedule => "*/2 * * * * *"
statement => " Select DISTINCT EDW.CONSENT.BENEFICIARY_DETAILS.HEALTHRECORDKEY as patient_provider_hrk,CAST(LOG_DATE AS DATE) AS LOG_DATE, CONVERT( VARCHAR, EDW.CONSENT.BENEFICIARY_DETAILS.dob, 101 ) as DOB1,
CONCAT(CONCAT(PC1.LastName, ' '),PC1.FIRSTNAME) as PCP, PC1.ProviderId as pid, pc.PROVIDER_TYPE, pc.ACTIVE,EDW.CONSENT.BENEFICIARY_DETAILS.MEDICARE,
CONCAT(CONCAT(EDW.CONSENT.BENEFICIARY_DETAILS.LASTNAME, ' '),EDW.CONSENT.BENEFICIARY_DETAILS.FIRSTNAME) as patientname
, EDW.CONSENT.BENEFICIARY_DETAILS.SEX,
EDW.CONSENT.BENEFICIARY_DETAILS.DOB,EDW.CONSENT.BENEFICIARY_DETAILS.BENEFICIARY_STATUS,
edw.mobile.TEMP123.CARE_PROGRAM_NAME,
edw.mobile.TEMP123.CAREPROG_ID,EDW.CONSENT.BENEFICIARY_DETAILS.AGE,EDW.CONSENT.BENEFICIARY_DETAILS.CITY,
EDW.CONSENT.BENEFICIARY_DETAILS.phone_home,EDW.CONSENT.BENEFICIARY_DETAILS.EMAIL,
EDW.CONSENT.BENEFICIARY_DETAILS.MEDICARE,EDW.CONSENT.BENEFICIARY_DETAILS.REGIONNAME,
CONCAT(CONCAT(up.LAST_NAME, ' '),up.FIRST_NAME) as caremanager, up.user_id
from USER_TO_PATIENT
left outer join EDW.CONSENT.BENEFICIARY_DETAILS on USER_TO_PATIENT.healthrecordkey =EDW.CONSENT.BENEFICIARY_DETAILS.healthrecordkey
left outer join edw.mobile.TEMP123 on EDW.CONSENT.BENEFICIARY_DETAILS.healthrecordkey = edw.mobile.TEMP123.healthrecordkey-- order by 1 desc
LEFT outer join EDW..PATIENT_PROVIDERS as pc on EDW.CONSENT.BENEFICIARY_DETAILS.healthrecordkey = pc.healthrecordkey
LEFT JOIN (SELECT DISTINCT HEALTHRECORDKEY ,MAX(ISNULL(UPDATED_DATE, CREATED_DATE)) LOG_DATE
FROM CAREMANAGER..PATIENT_CONTACT_LOGS WITH (NOLOCK)
GROUP BY HEALTHRECORDKEY)PCL ON EDW.CONSENT.BENEFICIARY_DETAILS.HEALTHRECORDKEY = PCL.HEALTHRECORDKEY
LEFT outer join PROVIDERADMIN..CMSPROVIDER as PC1 on PC1.ProviderId=pc.CMSPROVIDER_ID
inner join PROVIDERADMIN.dbo.USERS AS up on CAREMANAGER..USER_TO_PATIENT.user_id=up.USER_ID where CAREMANAGER..USER_TO_PATIENT.ACTIVE = 'Y' AND CAREMANAGER..USER_TO_PATIENT.RELATION in ('P', 'S')and BENEFICIARY_STATUS = 'active'
AND ISNULL(PC1.STATUS,'Y') = 'Y' AND ISNULL(PC.PROVIDER_TYPE, 'P') = 'P' AND pc.ACTIVE = 'A' order by patientname"
}
}
output {
elasticsearch {
hosts => ["wwwwwwwww:9200"]
index => "dbdataindex_join_allcolumns_qa_modify_cm_28_08_2019_test"
document_type => "caremanagertest"
document_id => "%{"patient_provider_hrk"}"
}
}
Note : in above query i am fetching data from multiple table into single index : (dbdataindex_join_allcolumns_qa_modify_cm_28_08_2019_test).
please help me ,i want to fetch only updated /changed data into Elasticsearch index.
Thanks
HadoopHelp