I am doing data migration from mongodb to elasticsearch by using logstash . I am using jdbc mongodb jar as a plugin .
I have aggregation query to get the records from mongodb and insert into elasticsearch .
The issues are it is fetching the first record from top in mongod and inserting into elasticsearch and rest document did not insert into elasticsearch . And when i saw the logstash server and i found that the records which has been inserted into elasticsearch it is printing multiple times .conf file
input {
jdbc{
# NOT THIS # jdbc_driver_class => "Java::mongodb.jdbc.MongoDriver"
jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
jdbc_driver_library => "D:/ELK/jarfile/mongojdbc3.0.jar"
jdbc_user => "" #no user and pwd
jdbc_password => ""use_column_value => true
tracking_column => "%{[application_id]}"
jdbc_connection_string => "jdbc:mongodb://user
Blockquote
:pass@mongodb.sandboxforbank.com:27017/Feature-QA-4?ssl=true"
jdbc_connection_string => ""
statement => " db.oaoapplicants.aggregate([ { $match: { _applicationStatus: 'ACTIVE', } }, { '$lookup': { 'from': 'oaoproductdetails', let: { product_code: '$product_code' }, pipeline: [ { $match: { $expr: { $and: [ { $eq: [ '$product_code', '$$product_code' ] } ] } } }, { $project: { _id: 0 } } ], 'as': 'productDetails' } },
{
$unwind : {
path : '$productDetails',
preserveNullAndEmptyArrays: true
}
},
{
'$lookup': {
'from': 'applicationtasksconfigs',
let: {
application_id: '$application_id'
},
pipeline: [
{
$match: {
$expr: {
$and: [
{
$eq: [
'$application_id',
'$$application_id'
]
},
{
$ne: [
'$assignedTo.username',
'Unassigned'
]
},
{
$eq: [
'$deleted',
false
]
}
]
}
}
}, {
$project: {
_id: 0,
'operationalLevelAgreement.overall._id':0,
'operationalLevelAgreement.queued._id':0,
'operationalLevelAgreement.assigned._id':0,
}
}
],
'as': 'task'
}
},
{
$unwind :{
path : '$task',
preserveNullAndEmptyArrays: true
}}, { $project: { '_id': 0, 'application_id': '$application_id', 'am_accountNo': '$am_accountNo' } },
{
$limit : 5
}
])
"}
}output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => "http://localhost:9200"
index => "abc_index3"
document_id => "%{[document][application_id]}"
}
}output in logstash server
{
"@version" => "1",
"document" => {
"am_accountNo" => "846176",
"application_id" => "DB2021012131408"
},
"@timestamp" => 2021-02-15T12:40:05.952Z
}
{
"@version" => "1",
"document" => {
"am_accountNo" => "846176",
"application_id" => "DB2021012131408"
},
"@timestamp" => 2021-02-15T12:40:05.933Z
}
[2021-02-15T18:10:06,249][INFO ][org.mongodb.driver.connection][main][b1604bb1b4a7cd7c375134ae04cfb5db329957ef22629f3eed0409196d19df6a] Closed connection [connectionId{localValue:2, serverValue:24532}] to mongodb.sandboxforbank.com:27017 because the pool has been closed.
{
"@version" => "1",
"document" => {
"am_accountNo" => "846176",
"application_id" => "DB2021012131408"
},
"@timestamp" => 2021-02-15T12:40:05.949Z
}
{
"@version" => "1",
"document" => {
"am_accountNo" => "846176",
"application_id" => "DB2021012131408"
},
"@timestamp" => 2021-02-15T12:40:05.949Z
}
{
"@version" => "1",
"document" => {
"am_accountNo" => "846176",
"application_id" => "DB2021012131408"
},
"@timestamp" => 2021-02-15T12:40:05.950Z
}
[2021-02-15T18:10:07,633][INFO ][logstash.javapipeline ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2021-02-15T18:10:07,824][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2021-02-15T18:10:07,882][INFO ][logstash.runner ] Logstash shut down.
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.