Well, i've changed the ruby code of the jdbc plugin to this:
public
def get_column_value(row)
if !row.has_key?(@tracking_column.to_sym)
if !row.dig(:document, @tracking_column).nil?
@sql_last_value = row.dig(:document, @tracking_column)
elsif !@tracking_column_warning_sent
@logger.warn("tracking_column not found in dataset.", :tracking_column => @tracking_column)
@tracking_column_warning_sent = true
end
# If we can't find the tracking column, return the current value in the ivar
@sql_last_value
else
# Otherwise send the updated tracking column
row[@tracking_column.to_sym]
end
end
the query return documents like this {:document => { ...dataOfMongo... }}, so in code, if the tracking_column is not in the first level (as it is in mysql), we should search that key in the second level of the row, that's why I've put this if:
if !row.dig(:document, @tracking_column).nil?
@sql_last_value = row.dig(:document, @tracking_column)
If it's not null, use the value of tracking_column in sql last value as it should.
Now, all I have to do is edit the statement to sort by that column and limit the query like this:
statement => "db.getCollection('collection').find({ updatedat: { $gte: :sql_last_value}},{'_id':0}).sort({updatedat:1}).limit(1000)"
And every minute will get 1000 new documents (if they exists) since sql_last_value, otherwise, sql_last_value will remain the same.
Hope it helps.