Logstash doesn't index to elasticsearch properly/delay in indexing

There is quite a significant delay occurring when there is updation in the database. It takes up to one to half an hour for the change to be reflected in the elasticsearch index. However if I change another product or change another field of the product, then the previous update will appear in the index immediately but the new change won't be reflected.
This is my logstash configuration.

# Postgres to ES
input {
    jdbc {
        tags => ["index-1"]
        jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/database"
        jdbc_user => "xxxx"
        jdbc_password => "xxxx"
        jdbc_validate_connection => "true"
        jdbc_driver_library => "postgresql.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        tracking_column => "last_change_on"
        use_column_value => true
	clean_run => true
        tracking_column_type => "timestamp"
        schedule => "*/5 * * * * *"
        statement => "select * from table_1 where last_change_on >= :sql_last_value"
        jdbc_paging_enabled => true
        jdbc_page_size => 1000
    }
    jdbc {
        tags => ["index-2"]
        jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/database"
        jdbc_user => "xxxx"
        jdbc_password => "xxxx"
        jdbc_validate_connection => "true"
        jdbc_driver_library => "postgresql.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        tracking_column => "last_change_on"
        use_column_value => true
	clean_run => true
        tracking_column_type => "timestamp"
        schedule => "*/5 * * * * *"
        statement => "select * from table_2 where last_change_on >= :sql_last_value"
        jdbc_paging_enabled => true
        jdbc_page_size => 1000
    }
}
filter
{
    if 'index_1' in [tags]
    {
        aggregate
        {
            task_id => "%{id}"
            code => "map['id'] = event.get('id')
                     map['name'] = event.get('name')
                     map['suggest'] = event.get('suggest')
                     event.cancel()"
            push_previous_map_as_event => true
            timeout_tags => ['aggregated']
        }
        if "aggregated" not in [tags]
        {
            drop {}
        }
	    clone {
            clones => ['clone_for_indexing', 'clone_for_suggestions']
        }
        if [type] == 'clone_for_suggestions' {
            prune {
                whitelist_names => ["id", "suggest"]
            }
            mutate {
                add_field => { "[@metadata][type]" => "suggestions-index-1" }
            }
        }
        if [type] == 'clone_for_indexing' {
            mutate {
                add_field => { "[@metadata][type]" => "index-1" }
            	remove_field => ["@version" , "@timestamp", "tags", "type", "suggest"]
            }
        }
    }
    if 'index_2' in [tags]
    {
        aggregate
        {
            task_id => "%{id}"
            code => "map['id'] = event.get('id')
                     map['name'] = event.get('name')
                     map['suggest'] = event.get('suggest')
                     event.cancel()"
            push_previous_map_as_event => true
            timeout_tags => ['aggregated']
        }
        if "aggregated" not in [tags] {
            drop {}

        }
        clone {
            clones => ['clone_for_indexing', 'clone_for_suggestions']
        }
        if [type] == 'clone_for_suggestions' {
            prune {
                whitelist_names => ["id", "suggest"]
            }
            mutate {
                add_field => { "[@metadata][type]" => "suggestions-index-2" }
            }
        }
        if [type] == 'clone_for_indexing' {
            mutate {
                add_field => { "[@metadata][type]" => "index-2" }
            	remove_field => ["@version" , "@timestamp", "tags", "type", "suggest"]
            }
        }
    }
}
output {
    stdout {
             codec =>  rubydebug {
                 metadata => true
             }
    }
   if [@metadata][type] == 'suggestions-index-1' {
       elasticsearch {
                 index => "index-1-search"
                 hosts => ["http://localhost:9200"]
                 document_id => "%{id}"
       }
   }
   if [@metadata][type] == 'index-1' {
       elasticsearch {
                 index => "index-1"
                 hosts => ["http://localhost:9200"]
                 document_id => "%{id}"
       }
   }
   if [@metadata][type] == 'index-2'
   {
       elasticsearch {
                index => "index-2"
                hosts => ["http://localhost:9200"]
                document_id => "%{id}"
            }
   }
   if [@metadata][type] == 'suggestions-index-2' 
   {
	elasticsearch {
		index => "index-2-search"
		hosts => ["http://localhost:9200"]	
		document_id => "%{id}"
        }
   }
}

When I checked the logs, the printed SQL statement executes properly in the pgadmin query tool however the product is still not indexed in the elasticsearch indexes.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.