Db streaming from last input

Hi, I'm using logstash to extract data from database and send to elasticsearch and to kibana.
What I want to do is I want elasticsesarch to store past data and just get newly inserted data to my oracle database. For instance, if data [No.1] ~ [No.10] have already been inserted to Elasticsearch, I want logstash to just read data from [No.11] ~ [No. most recent] and store them to Elasticsearch. Since I couldn't figure that out, I alternatively used duplicate filter, please refer to the code below to see how I did.

  1. read every data till now:

TO_CHAR(DATE, 'yyyy-mm-dd HH24:MI') < TO_CHAR(SYSDATE, 'yyyy-mm-dd HH24:MI')

  1. remove duplicates

filter {
mutate {
add_field => {
"[@metadata][document_id]" => "%{IDX}"
}
}
}

But I guess this is waste of resource since logstash will read the whole data first and then remove the duplicates. Is there a better way to improve my code?

Thanks

Best

gee


P.S.

My logstash.conf file is as follows:

input {
jdbc {
jdbc_validate_connection =>
jdbc_connection_string =>
jdbc_user =>
jdbc_password =>
jdbc_driver_library =>
jdbc_driver_class =>
statement =>
"
SELECT IDX, DATE, PRICE, PROFIT
FROM TABLE_NAME
WHERE
TO_CHAR(DATE, 'yyyy-mm-dd HH24:MI') < TO_CHAR(SYSDATE, 'yyyy-mm-dd HH24:MI')
"
}
}
filter {
mutate {
add_field => {
"[@metadata][document_id]" => "%{IDX}"
}
}
}
output {
elasticsearch {
index =>
hosts =>
user =>
password =>
document_id => "%{[@metadata][document_id]}"
}
}

I skipped all the sensitive part, (e.g. user, password..)

The jdbc input does this for you. Set its tracking_column option to the name of the timestamp column and enable the feature with use_column_value. Then the plugin will maintain a query parameter with the timestamp (or whatever the tracking_column column contains) of the previous run, allowing you to select entries newer than that. See the plugin documentation for an example.

thx for the reply!
I'll check it out right away and come back to leave a comment