How logstash jdbc plugin fetch data from database

I have informix database that contain tons of tables and records that need to join some of them and send to elasticsearch. Result of this join are 70 columns and 100M records.

Here is the requirements:
1-For first time that i run this logstash config expect all data send to elasticsearch.
2-after first fetch need only “new records” and “updated records” in database updating in elasticsearch without fetch whole data from scratch.

How should I configure logstash and elasticsearch to do this?

Any idea?

I have done this many time over

you write logstash or any way you want to get record from this database

for example your table is only 100 days old.
and assuming you want to collect new record every five min. and updated is date column in your RDBMS database which updated this field when record is updated.

select x,y,z from table1, table2 where some conditions
and updated > sysdate - 100 ( give you last 100 days record)

once you get all the 100 days record. you put logstash in pipeline run it every five min and add
updated > sysdate - interval 5 minute

now you only reading last five min record.

input {
    jdbc {
        jdbc_validate_connection => true
        jdbc_connection_string => "jdbc:oracle:thin:@server1:port/dbname"
        jdbc_user => "logstash"
        jdbc_password => "${LOGSTASH_PASSWORD}"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        statement_filepath => "/etc/logstash/conf.d/sql/minute_job.sql"
        schedule => "05,10,15,20,25,30,35,40,45,55 * * * *"


1-How about first run? e.g i run without schedule parameters for first time but it will fetch again records after one round finish! Records duplicated!

2-what does the clean run?

The JDBC input plugin supports a sql_last_value parameter that allows you to create a SQL statement that only retrieves rows that have a timestamp newer than what was last fetched. In order to use this you need to make sure your data gets a timestamp updated whenever it is added or updated. If you need to handle deleted you may need to do so through soft deletes so they can be captured this way. You then need to create your join query so that you can select appropriate rows based on timestamp.

@Christian_Dahlqvist the problem is after join those table there is no timestamp columns. So what should i do in case? i have no timestamp column!

In order to use the JDBC plugin and only pick up new or updated data you need to add a timestamp. You could e.g. put a trigger on the tables in the join and update a timestamp on change. The join query could them get the max timestamp of the joined data and compare this to the sql_last_value for selection. The JDBC plugin runs a query and you need to be able to create this so it only gets new data.

What happen in this case:
1-logstash fetch table for first time.
2-one column of specific row change in database.

What happen in elastic index? It will index two event with different timestamp or overwrite single event whenever new change occurs?

In order for updates to take place you need to specify the document ID based on data in the document so it is always unique. If you do this an update will have the same ID as the original document and it will be updated as a result. This relies on you using a single index and not time-based indices, e.g. data streams.

@Christian_Dahlqvist I have unique column in result of join tables that call “id”, should I consider this?
If the answer is yes which parameter should i add to logstash config?

Does that uniquely identify each document that comes out as a result of the join and not just one component within the join?

If it does you can specify this as the ID using the document_id parameter in the Elasticsearch output. If it is not unique you may need to combine several fields into a unique ID.

1-Ok, i cannot decided which column should be use as track column.
And there are 7 tables that join together and each column might change over time, how logstash able to find them and trigger to run join query with minimum impact on database?

2-is there any way to find informix jdbc driver support sql_last_value parameter? I try to use it but logstash return syntax error and not execute join query!

You need to craft a SQL query (which may be a join) that retrieves the correct set of data based on using sql_last_value as criteria. There is no magic way.

@Christian_Dahlqvist I have join query that work without sql_last_value, but when i add sql_last_value to query logatash won’t run query, i suspect to infomix jdbc that support othis feature or not!

Would you please tell me which column more suitable to use as track column? And in case no column work for this aim, should i create some table ot column to show which part of those 7 table changed and set flag in this table to set it as track column?

Any other idea?

It is not a native feature - you need to compare the sql_last_value to one or more of your columns and filter that way. If you e.g. had a join of 3 tables and each table had a last_updated column populated upon change you could select only join records where one or more of these columns is greater than sql_last_value (indicates that at least one component within the join has been updated and the record as a result should be updated in Elasticsearch). If you do not have such columns you may need to alter your schema.

@Christian_Dahlqvist last_updated column values update automatically or should update manually after each changes?
We have 2 scenerio here 1-changes that made by application, 2-changes that made by dba.

How last_updated will be update in this situation? And is this a common to this on database?


This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.