I recently worked a lot about querying and writing values straight into MongoDB 3.6.2 from logstash. I noticed that many people in the forum tried to do the same thing and didn't succeed. I wanted to publish my solution, in case someone else need it. Hope you find this useful. If you find some way to simplify what I did, feel free to comment!
Reading From MongoDB:
In my case, I had to periodically read new data arriving to MongoDB. In addition, I wanted to delay the processing of new events coming in 30 minutes (see more details in the output plugin). That's why I had to use JDBC input plugin. A few more plugins are available, most of them are derived from phutchins plugin. However, I haven't tested them since they didn't suite my needs (this plugin process all the rows, without an option to query them) or no longer maintained.
The JDBC input plugin requires a JDBC library. I have found the following libraries: Unity JDBC, Progress and DB Schema. Some of these libraries have limitations (functional or license limitations).
DB Schema driver is probably the only real full functional Open-Source driver and the only one that really managed to worked for me. Pay attention it only supports read and only the native MongoDB query language (not SQL). The download link (contains 2 files) is found here.
Here's the code reading data from MongoDB:
input {
jdbc {
jdbc_driver_library => "mongojdbc1.2.jar"
jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
jdbc_connection_string => "jdbc:mongodb://DB_ADDRESS:27017/DB_NAME"
jdbc_validate_connection => true
jdbc_user => ""
clean_run => false
record_last_run => true
last_run_metadata_path => "/path/.logstash_jdbc_last_run"
schedule => "*/10 * * * * *"
jdbc_default_timezone => "YOUR_TIMEZONE"
statement =>
"
//well, actually this is JavaScript code. And was written in pure blood :(
var lastValue = :sql_last_value; //the last saved date that was scheduled to run query for
var HALF_HOUR_OFFSET = 1800000;
var extractedDate = lastValue.substring(0,10); //parsing the date of the last saved date
var year = Number(extractedDate.substring(0,4));
var month = Number(extractedDate.substring(5,7));
var day = Number(extractedDate.substring(8,10));
var extractedTime = lastValue.substring(11,23); //parsing the time of the last saved date
var hour = Number(extractedTime.substring(0,2));
var minute = Number(extractedTime.substring(3,5));
var seconds = Number(extractedTime.substring(6,8));
var miliseconds = Number(extractedTime.substring(9,12));
var upperEpoch = new Date(year,month-1,day,hour,minute,seconds,miliseconds).getTime(); //dates start from 0, that's why the minus 1
var bottomEpoch = upperEpoch - HALF_HOUR_OFFSET;
db.DB_COLLECTION.find({ COLLECTION_FIELD: { $gte : bottomEpoch, $lte: upperEpoch} },{'_id': false}); //we omit the 'id' since it irrelevant and also cause deserialize exception
"
}
}
Pay attention to this:
- The logstash is executing a query every 10 seconds, and the query geruntees that only new rows are read. Since there's a a compatibility problem between JAVA ISODate to MongoDB ISODate I had to convert the sql_last_value date string into epoch. The problem is that the sql_last_value is not saved in ISO format contains 'T' and 'Z'. That's why the following query doesn't work:
db.DB_COLLECTION.find({ DOCUMENT_FIELD: { $gt: ISODate(:sql_last_value) }},{'_id': false})"|
- Since the java library doesn't recognize the MongoDB ObjectID type, I have cut if off the ID field from the results. I didn't need it anyway.
- In case you have user and password to your MongoDB, put them inside the connection string and not as part of jdbc_user, jdbc_password fields.
- I haven't succeeded to use the jdbc tracking column. I also didn't need it anyway, but if you succeed please share it.
Writing Into Logstash:
Writing to logstash is simple. You can use the excellent MongoDB Output Plugin. Since I wanted to delay new events coming to MongoDB in 30 minutes, you can see that I use Ruby to add 30 minutes in epoch. That gurentees that logstash which reads data from ElasticSearch will not read the data once is it written into MongoDB.
filter {
json {
source => "message"
}
ruby {
code => 'event.set("db_timestamp", ((Time.now.to_f * 1000).to_i) + 1800000)'
}
mutate {
remove_field => ["path","@version", "message"]
}
}
output {
mongodb{
collection => "DB_COLLECTION"
database => "DB"
uri => "mongodb://DB_ADDRESS:27017/DB_COLLECTION"
}
}
Other alternative way is to deploy some HTTP interface inside MongoDB. That allows you to use http output plugin instead of mongodb output plugin.