Hello All,
I am using jdbc input and jdbc_streaming section in my logstash config to bring data from two different table into the same index.
My logstash config is below.
input {
jdbc {
jdbc_connection_string => "abc"
jdbc_user => "user"
jdbc_password => "xyz"
jdbc_driver_library => "/solr-index/Logstash/logstash-7.12.1/logstash-core/lib/jars/postgresql-jdbc.jar"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *" # cronjob schedule format (see "Helpful Links")
#jdbc_paging_enabled => "true"
#jdbc_page_size => "300"
lowercase_column_names => false
statement => 'SELECT PYID AS "CaseID",3 AS "CaseTypeID" FROM U90CCMWT.Marketing limit 10'
}
}
filter {
jdbc_streaming{
jdbc_connection_string => "abc"
jdbc_user => "user"
jdbc_password => "xyz"
jdbc_driver_library => "/solr-index/Logstash/logstash-7.12.1/logstash-core/lib/jars/postgresql-jdbc.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement => 'select case_type_de AS "CaseType_de",case_type_en AS "CaseType_en",case_type_es AS "CaseType_es",case_type_fi AS "CaseType_fi",case_type_fr AS "CaseType_fr",case_type_hi AS "CaseType_hi",case_type_it AS "CaseType_it",case_type_pl AS "CaseType_pl",case_type_pt AS "CaseType_pt",case_type_ru AS "CaseType_ru",case_type_sv AS "CaseType_sv",case_type_th AS "CaseType_th",case_type_tr AS "CaseType_tr",case_type_zh AS "CaseType_zh" from dbo.dtac_case_type_all_languages where case_type_id = "0"'
parameters => {"case_type_id" => "CaseTypeID"}
target => "table1"
}
#json { source => "table1" } # didn't work
split { field => "table1" }
}
output {
# used to output the values in the terminal (DEBUGGING)
# once everything is working, comment out this line
stdout { codec => "json" }
# used to output the values into elasticsearch
elasticsearch {
hosts => ["http://ip_node:9200"]
index => "vikings_es1"
#document_id => "{101}"
#doc_as_upsert => true # upserts documents (e.g. if the document does not exist, creates a new record)
}
}
Actual indexing:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 10,
"max_score" : 1.0,
"hits" : [
{
"_index" : "vikings_es1",
"_type" : "doc",
"_id" : "3B8UvHkBNEGMCKzqWc8g",
"_score" : 1.0,
"_source" : {
"CaseID" : "MKT-2062",
"@timestamp" : "2021-05-30T07:02:00.408Z",
"CaseTypeID" : 3,
"table1" : {
"CaseType_tr" : "Warranty",
"CaseType_pl" : "Warranty",
"CaseType_fr" : "Garantie",
"CaseType_pt" : "Garantia",
"CaseType_sv" : "Garanti",
"CaseType_ru" : "Гарантия",
"CaseType_th" : "Warranty",
"CaseType_fi" : "Takuu",
"CaseType_zh" : "三包",
"CaseType_es" : "Garantía",
"CaseType_it" : "Garanzia",
"CaseType_en" : "Warranty",
"CaseType_de" : "Garantie",
"CaseType_hi" : "वारंटी"
},
"@version" : "1"
}
},
Desired indexing
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 10,
"max_score" : 1.0,
"hits" : [
{
"_index" : "vikings_es1",
"_type" : "doc",
"_id" : "3B8UvHkBNEGMCKzqWc8g",
"_score" : 1.0,
"_source" : {
"CaseID" : "MKT-2062",
"@timestamp" : "2021-05-30T07:02:00.408Z",
"CaseTypeID" : 3,
"CaseType_tr" : "Warranty",
"CaseType_pl" : "Warranty",
"CaseType_fr" : "Garantie",
"CaseType_pt" : "Garantia",
"CaseType_sv" : "Garanti",
"CaseType_ru" : "Гарантия"
"CaseType_th" : "Warranty",
"CaseType_fi" : "Takuu",
"CaseType_zh" : "三包",
"CaseType_es" : "Garantía",
"CaseType_it" : "Garanzia",
"CaseType_en" : "Warranty",
"CaseType_de" : "Garantie",
"CaseType_hi" : "वारंटी",
"@version" : "1"
}
},
Please help in getting the desired indexing.