Tracking_column not found in dataset when using JDBC driver with mongodb (input)

I'm quite unclear of how to use :sql_last_value in my mongodb query statement like:

statement => "var lastId = ':sql_last_value'; db.docs.find({ _id: { $gte : lastId }})"

my config is like this

input {
  jdbc {
    jdbc_driver_library => "C:\Users\scott\Downloads\logstash-7.8.1\mongojdbc2.2.jar"
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    jdbc_connection_string => "jdbc:mongodb://localhost:27017/edb"
    jdbc_user => "admin"
    schedule => "* * * * *"
	tracking_column => "_id"
	use_column_value => true
    statement => "var lastId = ':sql_last_value'; db.docs.find({ _id: { $gte : lastId }})"
  }
}


output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "edb"
  }
}

when I try to use this it gives an error:

 tracking_column not found in dataset. {:tracking_column=>"_id"}

here is a sample mongodb doc


{
  "_id": "56ea4034559c3908e0199453",
  "56e9e2dca39c3453459173_initials": "",
  "56e9e2dc345358e0199173_date_range": "",
  "_my_metadata": {
    "ClientId": "333",
    "Type": "Test File",
    "FileName": "test asd as.pdf",
    "UploadDate": "2018-03-17T04:29:19.700Z",
    "AccessGroups": "[19453,345345]",
    "StateId": "0",
    "State": "null",
    "TypeId": "56ea423345345d8c3908e019944e",
    "Links": "[]",
    "Id": "56ea43453455508e0199453",
    "LastActionDate": "null",
    "WorkflowVersion": "0"
  }
}

I am wanting to store the document's '_id' field as the :sql_last_value and then use that value in the statement above.
Currently, from the log, it is using a 0 for the last value

var lastId = '0'; db.edb2.find({ _id: { $gte : lastId }})

How do I store a mongodb field in :sql_last_value? All the examples I can find use a SQL query, which mongodb doesnt use.

I have tried these with no success:

tracking_column => "_id"
tracking_column => "id"
tracking_column => "document._id"

Thanks for any help.

further info:

looking in the last run file, the contents are

--- 0

which doesn't change (expected last _id field)

I'm in the same situation...have you been able to find a solution?

No yet, I will post here as soon as I work something out,

Currently debugging using

sql_log_level => "debug"

and the std output instead of ES for testing

 output {
  stdout { codec => json_lines  } 
}

even trying the other Id field wont work

       clean_run => false
	record_last_run => true
	last_run_metadata_path => ".logstash_jdbc_last_run" 
	tracking_column => "_my_metadata.Id"
	use_column_value => true
[2020-09-01T18:36:01,875][WARN ][logstash.inputs.jdbc     ][main][5f0e104f7af94b7a18968d961b65caabe683939736ce330445cab1ae9e68542a] tracking_column not found in dataset. {:tracking_column=>"_my_metadata.Id"}

so yeah...

Still got nowhere

using this query to exclude the _id field and to add a new field called mongoid with a string version of the _id

statement => 'db.docs.aggregate([{$addFields:{"mongoid":{"$toString":"$_id"},}},{ $unset: ["_id"] }] )'

cool, this returns documents like this

{
  "@version": "1",
  "@timestamp": "2020-09-02T02:55:02.107Z",
  "document": {
    "570b0887a39c3917542bada1_patient_id": 23423,
    "570b0887a39c3917542bada1_patient_surname": "asdasd",
    "570b0887a39c3917542bada1_patient_given_name": "sdfsdfsdf",
    "_my_metadata": {

          ...
    },
    "570b0887a39c3917542bada1_gender": "Female",
    "570b0887a39c3917542bada1_document_type": "asdf asd",
    "570b0887a39c3917542bada1_scanning_date": null,
    "570b0887a39c3917542bada1_dob": "1953-05-12T00:00:00.001Z",
    "mongoid": "570b1235a39c3910002bb0df"
  }
}

nice, I will use the new 'mongoid' as the tracking column!

	clean_run => false
	record_last_run => true
	last_run_metadata_path => ".logstash_jdbc_last_run" 
	tracking_column => "document.mongoid"
	use_column_value => true

but, no!

[WARN ][logstash.inputs.jdbc     ][main][f028a6a0c2da3147b1bf3c2fdd6841186ee11e37f6dd69d82db1b1a0eecb343c] tracking_column not found in dataset. {:tracking_column=>"document.mongoid"}

same error occurs when using

tracking_column => "document.mongoid"
tracking_column => "mongoid"

Can anyone help with specifying the tracking column for a JDBC mongodb input please?

I ended up using the Elasticsearch output to specify my 'mongoid' field

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "edb"
    #user => "elastic"
    #password => "changeme"
	doc_as_upsert => true
	document_id => "%{[document][mongo_id]}"

  }
}

info

Got the idea from this brilliant page By Sakina Shaikh

Yep, i've done this also, pick some unique identifier from mongo document as id for elasticsearch.

but i really need that tracking_column work well, because i need to paginate all records from mongo collection (around 9M) and without pagination, it's trying to retrieve all in memory, so it's exploding in my face.. :sweat_smile:

My config file looks like this:

input {
    jdbc {
            jdbc_driver_library =>"/etc/logstash/jars/mongojdbc2.3.jar"
            jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
            jdbc_connection_string => "jdbc:mongodb+srv://user:pass@host/db"
            jdbc_user => ""
            schedule => "* * * * *"
            tracking_column => "updatedat"
            use_column_value => true
            statement => "db.getCollection('collection').find({ updatedat: { $gte: :sql_last_value}},{'_id':0})"
            clean_run => true
    }

}

output {
        amazon_es {
                hosts => ["https://elastichost:443"]
                region => "eu-west-1"
                index => "collectionindex"
                document_type => "collectionindex"
                document_id => "%{[document][mediaId]}"
        }
        stdout {
                codec => rubydebug
        }
}

In our dev environment, and without tracking column, works well because we have only 1,7k documents in collection and fits them all in memory, and it's executing every minute taking only new documents.

But this is not valid for prod environment because I need that sql_last_vale to be the tracking_column => udpatedat, and paginate the query with limit (and sorting, obviously).

I'm trying to debug jdbc.rb code in logstash-input-jdbc plugin to check what's going on with this.. The problem comes in this method:

public
def get_column_value(row)
  if !row.has_key?(@tracking_column.to_sym)
    if !@tracking_column_warning_sent
      @logger.warn("tracking_column not found in dataset.", :tracking_column => @tracking_column)
      @tracking_column_warning_sent = true
    end
    # If we can't find the tracking column, return the current value in the ivar
    @sql_last_value
  else
    # Otherwise send the updated tracking column
    row[@tracking_column.to_sym]
  end
end

I'll keep debuging to check what's going on...

I've used this plugin with mysql and works well.... maybe with document model database like mongo, and it's hierarchy in documents it's the real problem..

thanks anyway

1 Like

Well, i've changed the ruby code of the jdbc plugin to this:

public
def get_column_value(row)
  if !row.has_key?(@tracking_column.to_sym)
     if !row.dig(:document, @tracking_column).nil?
           @sql_last_value = row.dig(:document, @tracking_column)
    elsif !@tracking_column_warning_sent
      @logger.warn("tracking_column not found in dataset.", :tracking_column => @tracking_column)
      @tracking_column_warning_sent = true
    end
    # If we can't find the tracking column, return the current value in the ivar
    @sql_last_value
  else
    # Otherwise send the updated tracking column
    row[@tracking_column.to_sym]
  end
end

the query return documents like this {:document => { ...dataOfMongo... }}, so in code, if the tracking_column is not in the first level (as it is in mysql), we should search that key in the second level of the row, that's why I've put this if:

         if !row.dig(:document, @tracking_column).nil?
           @sql_last_value = row.dig(:document, @tracking_column)

If it's not null, use the value of tracking_column in sql last value as it should.

Now, all I have to do is edit the statement to sort by that column and limit the query like this:

statement => "db.getCollection('collection').find({ updatedat: { $gte: :sql_last_value}},{'_id':0}).sort({updatedat:1}).limit(1000)"

And every minute will get 1000 new documents (if they exists) since sql_last_value, otherwise, sql_last_value will remain the same.

Hope it helps.

1 Like

I spent today trying to compile the jdbc driver with the changes Toni provided but got nowhere (many issues with gem bundler and java, even the Logstash guys on Freenode #logstash couldnt help) - Could not find gem 'logstash-devutils' in any of the gem sources listed in your Gemfile. etc etc

Finaly found an out of the box solution:

https://rwynn.github.io/monstache-site/

Works brilliantly and uses MongoDB change streams instead of legacy oplog tailing. It also handles the _id ObjectId field without any fucking around. Logstash is pretty good by this solution is just better,

Thanks everyone for your help, and Toni - you're a legend.

~S

TLDR: dont bother using logstash for mongodb input unless you like rewriting drivers. just use Monstache - works out-of-the-box

https://rwynn.github.io/monstache-site/