Sychronize Index with Database in Logstash

I am trying to read from my database periodically on a schedule. Every 5 minutes. I noticed that each time I read from the database the index doubles in size. I only want changes in the database to be transferred to my index. Is there a way to Synchronize the data so it only saves the new data?

input {
jdbc {
# oracle jdbc connection string to our database, mydb
jdbc_connection_string => "jdbc:oracle:thin:@//192.168.229.129:1521/XE"

    # The user we wish to execute our statement as
    jdbc_user => "**"
    jdbc_password => "***"
    jdbc_validate_connection => true
    # The path to our downloaded jdbc driver
    jdbc_driver_library => "/home/siteadm/lib/ojdbc6-11.2.0.1.0.jar"
    # The name of the driver class for oracle
    jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
    schedule => "0,5,10,15,20,25,30,35,40,45,50,55 * * * * "
    # our query
    statement => "SELECT r.id as request_id, r.tid as tid, r.central_t_id as central_t_id, r.t_location_title as financial_institution, r.t_location_address_line1 as fi_address, r.t_location_city as fi_city, r.t_location_state_code as fi_state, r.t_location_zip as fi_zip, t.latitude as latitude, t.longitude as longitude, ag.REQUEST_ID_PREFIX as AGENCY_PREFIX  from requests r inner join t_locations t on r.t=t.id inner join agencies ag on r.AGENCY_ADJUDICATOR=ag.fallback_user_id where r.updated_date > sysdate-20"
}

}

filter {

    mutate {
            add_field => ["[lonlat]","%{longitude}"]
            add_field => ["[lonlat]","%{latitude}"]
     }
    mutate {
            convert => [ "[lonlat]", "float" ]
    }

}

output {
elasticsearch {
index => "agency-%{+YYYY.MM.dd}"

      #document_type => "request"
      #document_id => "%{request-id}"
      hosts => localhost
     }
    stdout { codec => dots }

}

Logstash keeps track of either

  • the last time the query was run or
  • a column value of the last read row

and you can use that value in your query via the sql_last_value named parameter to only select recently added/update rows. See the documentation for details.