There is quite a significant delay occurring when there is updation in the database. It takes up to one to half an hour for the change to be reflected in the elasticsearch index. However if I change another product or change another field of the product, then the previous update will appear in the index immediately but the new change won't be reflected.
This is my logstash configuration.
# Postgres to ES
input {
jdbc {
tags => ["index-1"]
jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/database"
jdbc_user => "xxxx"
jdbc_password => "xxxx"
jdbc_validate_connection => "true"
jdbc_driver_library => "postgresql.jar"
jdbc_driver_class => "org.postgresql.Driver"
tracking_column => "last_change_on"
use_column_value => true
clean_run => true
tracking_column_type => "timestamp"
schedule => "*/5 * * * * *"
statement => "select * from table_1 where last_change_on >= :sql_last_value"
jdbc_paging_enabled => true
jdbc_page_size => 1000
}
jdbc {
tags => ["index-2"]
jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/database"
jdbc_user => "xxxx"
jdbc_password => "xxxx"
jdbc_validate_connection => "true"
jdbc_driver_library => "postgresql.jar"
jdbc_driver_class => "org.postgresql.Driver"
tracking_column => "last_change_on"
use_column_value => true
clean_run => true
tracking_column_type => "timestamp"
schedule => "*/5 * * * * *"
statement => "select * from table_2 where last_change_on >= :sql_last_value"
jdbc_paging_enabled => true
jdbc_page_size => 1000
}
}
filter
{
if 'index_1' in [tags]
{
aggregate
{
task_id => "%{id}"
code => "map['id'] = event.get('id')
map['name'] = event.get('name')
map['suggest'] = event.get('suggest')
event.cancel()"
push_previous_map_as_event => true
timeout_tags => ['aggregated']
}
if "aggregated" not in [tags]
{
drop {}
}
clone {
clones => ['clone_for_indexing', 'clone_for_suggestions']
}
if [type] == 'clone_for_suggestions' {
prune {
whitelist_names => ["id", "suggest"]
}
mutate {
add_field => { "[@metadata][type]" => "suggestions-index-1" }
}
}
if [type] == 'clone_for_indexing' {
mutate {
add_field => { "[@metadata][type]" => "index-1" }
remove_field => ["@version" , "@timestamp", "tags", "type", "suggest"]
}
}
}
if 'index_2' in [tags]
{
aggregate
{
task_id => "%{id}"
code => "map['id'] = event.get('id')
map['name'] = event.get('name')
map['suggest'] = event.get('suggest')
event.cancel()"
push_previous_map_as_event => true
timeout_tags => ['aggregated']
}
if "aggregated" not in [tags] {
drop {}
}
clone {
clones => ['clone_for_indexing', 'clone_for_suggestions']
}
if [type] == 'clone_for_suggestions' {
prune {
whitelist_names => ["id", "suggest"]
}
mutate {
add_field => { "[@metadata][type]" => "suggestions-index-2" }
}
}
if [type] == 'clone_for_indexing' {
mutate {
add_field => { "[@metadata][type]" => "index-2" }
remove_field => ["@version" , "@timestamp", "tags", "type", "suggest"]
}
}
}
}
output {
stdout {
codec => rubydebug {
metadata => true
}
}
if [@metadata][type] == 'suggestions-index-1' {
elasticsearch {
index => "index-1-search"
hosts => ["http://localhost:9200"]
document_id => "%{id}"
}
}
if [@metadata][type] == 'index-1' {
elasticsearch {
index => "index-1"
hosts => ["http://localhost:9200"]
document_id => "%{id}"
}
}
if [@metadata][type] == 'index-2'
{
elasticsearch {
index => "index-2"
hosts => ["http://localhost:9200"]
document_id => "%{id}"
}
}
if [@metadata][type] == 'suggestions-index-2'
{
elasticsearch {
index => "index-2-search"
hosts => ["http://localhost:9200"]
document_id => "%{id}"
}
}
}
When I checked the logs, the printed SQL statement executes properly in the pgadmin query tool however the product is still not indexed in the elasticsearch indexes.