I want to synchronize 2 postgres tables with 3 elasticsearch indexes.
I'm going to split one of the tables and synchronize it with two indexes.
so I'm cloning one of those tables. Divide the cloned things into 2 indexes and make outputs.
In other words, there are three outputs. I wonder if it is possible to do this in one conf file. I want you to let me know if anyone knows about this. Share your experience.
here is my code below.
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
jdbc_user => "atlas"
jdbc_password => "atlas"
jdbc_validate_connection => true
jdbc_driver_library => "/lib/postgres-42-test.jar"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "select *, (select json_build_object('region_id', region_id, 'region_name', region_name, 'region_name_kr', region_name_kr, 'region_type', region_type, 'region_code', region_code)::jsonb from expedia_region_union where region_id = c.region_id ) as region_json from expedia_airport_more c"
type => "A"
}
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/atlasdb?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
jdbc_user => "atlas"
jdbc_password => "atlas"
jdbc_validate_connection => true
jdbc_driver_library => "/lib/postgres-42-test.jar"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "SELECT region_id, region_name, region_name_full, region_name_kr, continent::text, country::text from expedia_region_union order by region_id asc limit 100"
type => "B"
}
stdin {
codec => plain { charset => "UTF-8"}
}
}
filter {
ruby {
code => "
require 'json'
continent_json = JSON.parse(event.get('continent').to_s)
event.set('continent', continent_json)
country_json = JSON.parse(event.get('country').to_s)
event.set('country', country_json)
"
}
date {
match => ['time', 'UNIX']
}
if [type] == 'B' {
clone {
clones => ['B-1', 'B-2']
}
if [type] == 'B-1' {
mutate {
add_field => {"[@metadata][type]" => "B-1-test"}
}
} else {
mutate {
add_field => {"[@metadata][type]" => "B-2-test"}
}
}
} else {
mutate {
add_field => {"[@metadata][type]" => "A-test"}
}
}
}
output {
stdout { codec => rubydebug }
if [@metadata][type] == 'A-test' {
elasticsearch {
hosts => ["localhost:9200"]
index => "2020-06-19-fri"
doc_as_upsert => true
action => "update"
document_id => "%{region_id}"
}
}
else if [@metadata][type] == 'B-1-test' {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "B-1-test"
doc_as_upsert => true
action => "update"
document_id => "%{region_id}"
}
} else {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "B-2-test"
doc_as_upsert => true
action => "update"
document_id => "%{region_id}"
}
}
}