may be you can try my solution
input{
jdbc{
# mysql jdbc connection string to our backup databse
jdbc_connection_string => "jdbc:mysql://localhost:3306/price_prod_test"
# the user we wish to excute our statement as
jdbc_user => "user"
jdbc_password => "pass"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
# the control statement
record_last_run=>true
type => "product"
statement => "SELECT * FROM product"
}
jdbc{
# mysql jdbc connection string to our backup databse
jdbc_connection_string => "jdbc:mysql://localhost:3306/price_prod_test"
# the user we wish to excute our statement as
jdbc_user => "user"
jdbc_password => "pass"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
# the control statement
record_last_run=>true
type => "merchant"
statement => "SELECT * FROM merchant"
}
jdbc{
# mysql jdbc connection string to our backup databse
jdbc_connection_string => "jdbc:mysql://localhost:3306/price_prod_test"
# the user we wish to excute our statement as
jdbc_user => "user"
jdbc_password => "pass"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
# the control statement
record_last_run=>true
type => "news"
statement => "SELECT * FROM news"
}
}
filter {
if [type] == "product"{
mutate {
add_field => { "[product_suggest][input]" => "%{display_name}"}
add_field => { "[product_suggest][weight]" => "%{id}"}
add_field => { "id_sort" => "%{id}" }
remove_field => ["attribute_value_en", "attribute_value_sc"]
}
}
if [type] == "merchant"{
mutate {
add_field => { "[merchant_suggest][input]" => "%{merchant_name}"}
add_field => { "[merchant_suggest][weight]" => "%{id}"}
}
}
if [type] == "news"{
mutate {
add_field => { "id_sort" => "%{id}" }
}
}
}
output {
stdout{codec => dots}
if [type] == "product"{
elasticsearch {
hosts => "localhost"
index => "product"
document_id => "%{id}"
}
}
if [type] == "merchant"{
elasticsearch {
hosts => "localhost"
index => "merchant"
document_id => "%{id}"
}
}
if [type] == "news"{
elasticsearch {
hosts => "localhost"
index => "news"
document_id => "%{id}"
}
}
}