Data migration from mongodb to elasticsearch

I am doing data migration from mongodb to elasticsearch by using logstash . I am using jdbc mongodb jar as a plugin .
I have aggregation query to get the records from mongodb and insert into elasticsearch .
The issues are it is fetching the first record from top in mongod and inserting into elasticsearch and rest document did not insert into elasticsearch . And when i saw the logstash server and i found that the records which has been inserted into elasticsearch it is printing multiple times .

conf file

input {
jdbc{
# NOT THIS # jdbc_driver_class => "Java::mongodb.jdbc.MongoDriver"
jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
jdbc_driver_library => "D:/ELK/jarfile/mongojdbc3.0.jar"
jdbc_user => "" #no user and pwd
jdbc_password => ""

use_column_value => true

tracking_column => "%{[application_id]}"

jdbc_connection_string => "jdbc:mongodb://user

Blockquote

:pass@mongodb.sandboxforbank.com:27017/Feature-QA-4?ssl=true"

jdbc_connection_string => ""

statement => "             
  								db.oaoapplicants.aggregate([
{
    $match: {
        _applicationStatus: 'ACTIVE',
    }
},
{
    '$lookup': {
        'from': 'oaoproductdetails',
        let: {
            product_code: '$product_code'
        },
        pipeline: [
            {
                $match: {
                    $expr: {
                        $and: [
                            {
                                $eq: [
                                    '$product_code',
                                    '$$product_code'
                                ]
                            }
                        ]
                    }
                }
            },
            {
                $project: {
                    _id: 0
                }
            }
        ],
        'as': 'productDetails'
    }
},

{
$unwind : {
path : '$productDetails',
preserveNullAndEmptyArrays: true
}
},
{
'$lookup': {
'from': 'applicationtasksconfigs',
let: {
application_id: '$application_id'
},
pipeline: [
{
$match: {
$expr: {
$and: [
{
$eq: [
'$application_id',
'$$application_id'
]
},
{
$ne: [
'$assignedTo.username',
'Unassigned'
]
},
{
$eq: [
'$deleted',
false
]
}
]
}
}
}, {
$project: {
_id: 0,
'operationalLevelAgreement.overall._id':0,
'operationalLevelAgreement.queued._id':0,
'operationalLevelAgreement.assigned._id':0,
}
}
],
'as': 'task'
}
},
{
$unwind :{
path : '$task',
preserveNullAndEmptyArrays: true
}

},
{
    $project: {
         '_id': 0,
          'application_id': '$application_id',
  	   'am_accountNo': '$am_accountNo'
  	 }
},

{
$limit : 5
}
])
"

}
}

output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => "http://localhost:9200"
index => "abc_index3"
document_id => "%{[document][application_id]}"
}
}

output in logstash server

{
"@version" => "1",
"document" => {
"am_accountNo" => "846176",
"application_id" => "DB2021012131408"
},
"@timestamp" => 2021-02-15T12:40:05.952Z
}
{
"@version" => "1",
"document" => {
"am_accountNo" => "846176",
"application_id" => "DB2021012131408"
},
"@timestamp" => 2021-02-15T12:40:05.933Z
}
[2021-02-15T18:10:06,249][INFO ][org.mongodb.driver.connection][main][b1604bb1b4a7cd7c375134ae04cfb5db329957ef22629f3eed0409196d19df6a] Closed connection [connectionId{localValue:2, serverValue:24532}] to mongodb.sandboxforbank.com:27017 because the pool has been closed.
{
"@version" => "1",
"document" => {
"am_accountNo" => "846176",
"application_id" => "DB2021012131408"
},
"@timestamp" => 2021-02-15T12:40:05.949Z
}
{
"@version" => "1",
"document" => {
"am_accountNo" => "846176",
"application_id" => "DB2021012131408"
},
"@timestamp" => 2021-02-15T12:40:05.949Z
}
{
"@version" => "1",
"document" => {
"am_accountNo" => "846176",
"application_id" => "DB2021012131408"
},
"@timestamp" => 2021-02-15T12:40:05.950Z
}
[2021-02-15T18:10:07,633][INFO ][logstash.javapipeline ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2021-02-15T18:10:07,824][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2021-02-15T18:10:07,882][INFO ][logstash.runner ] Logstash shut down.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.