Indexer Mongodb dans ES en temps réel

Bonjour à tous,

Mon besoin est d'indexer en temps réel une base mongodb (plusieurs collections) dans Elasticsearch (en leur appliquant qlq transformation au passage : formatage des geo_point par exemple).
J'ai essayé avec Logstash mais le plugin input de mongodb n'est pas officiel pour ES si j'ai bien compris et je ne sais pas comment faire de la synchro régulière et temps réel (ou presque).

Vos avis svp :slight_smile:
Jalil

L'idéal serait de pousser la donnée vers les deux systèmes à la fois. Donc depuis l'application source.

Où envoyer vers un système de messagerie. Puis lire avec logstash et pousser vers les deux systèmes...

Merci David,

je vais explorer ta proposition.

As tu une idée sur le connecteur mongo-connector ?
Si j'utilise ce connecteur, est ce que je peux faire une transformation sur les data au passage ? (ex. : transformer un objet de coordonnées GPS en Geo_Point) ?

Merci encore une fois.

Je ne connais pas cet outil.

Pour les transformations, il existe de toute façon les ingest pipelines dans elasticsearch qui permet de transformer la donnée avant qu'elle ne soit réellement indexée.

Mais ce n'est peut-être pas nécessaire et peut-être est-ce seulement un souci de mapping dans ton cas.

Bonsoir,

En effet, je pense que c'est un souci de mapping.
Comment je peux mapper l'objet source ci-dessous pour obtenir un geo_point compréhensible par ES ?
image

Peux-tu stp fournir un exemple de ce document en format json ?
Pour un besoin simulaire, j'ai eu à utiliser le plugin JDBC avec le driver DBSchema
Regardez ce thread sur Stackoverflow.
Mais les 2 approches sont juste des alternatives non supportées officiellement par la communuaté Elastic :slight_smile:

Voici un objet JSON. L'idée est de les indéxer dans ES et réussir à mapper les objets coordinates en geo_point.
Dans un objet, j'ai principalement un "start" et un "end" avec chacun ses coordonnées GPS.

{ 
    "_id" : ObjectId("5c7d4c41623e0c159c6717d4"), 
    "driver_id" : "", 
    "driver_name" : "", 
    "fleet_id" : "", 
    "organization_id" : "15", 
    "start" : {
        "in" : {
            "name" : "à 3,648 de Ghannouch", 
            "type" : "Point", 
            "coordinates" : [
                10.05514, 
                33.909508
            ]
        }, 
        "address" : {
            "name" : "الفحص الفني", 
            "street" : "Route Nationale 1", 
            "city" : "Ghanouch", 
            "zipcode" : "6031", 
            "country" : "Tunisie"
        }
    }, 
    "started_at" : ISODate("2019-03-04T18:20:27.000+0000"), 
    "updated_at" : ISODate("2020-11-26T13:21:04.668+0000"), 
    "vehicle_id" : "213", 
    "vrn" : "Scania 9139TU152", 
    "avgConsump" : null, 
    "distance" : 9.3, 
    "end" : {
        "_id" : ObjectId("5c7d7270623e0c7d742eeca3"), 
        "in" : {
            "name" : "à 2,299 de Gabès", 
            "type" : "Point", 
            "coordinates" : [
                10.090872, 
                33.863937
            ]
        }, 
        "address" : {
            "name" : "Sidi Boulbaba", 
            "street" : null, 
            "city" : "Gabes Sud", 
            "zipcode" : "6033", 
            "country" : "Tunisie"
        }, 
        "stop" : {
            "started_at" : ISODate("2019-03-04T18:45:48.000+0000"), 
            "ended_at" : ISODate("2019-03-05T02:30:32.000+0000"), 
            "duration" : NumberInt(27840), 
            "idling_duration" : null, 
            "idling" : [

            ]
        }
    }, 
    "ended_at" : ISODate("2019-03-04T18:45:48.000+0000"), 
    "fuel_consumption" : null, 
    "trip_duration" : NumberInt(1521)
}

Typiquement ton index doit avoir 2 champs de type geo_point

start_position

Et

end_position

Les traformations suivantes sont nécessaires avant indexation

start.in.coordinates[0] => start_position.lat
start.in.coordinates[1] => start_position.lon

end.in.coordinates[0] => end_position.lat
end.in.coordinates[1] => end_position.lon

Merci pour ton retour :slight_smile:

il ne faut pas aussi le convertir en float ?

Je vais tester avec logstash et je te tiendrai au courant.

Oui effectivement il faut les convertir en float :slight_smile:

Quelque chose comme cette config devrait marcher avec ton exemple

input {
  jdbc {
    jdbc_driver_library => "/opt/driver_mongodb/mongojdbc3.0.jar"
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    jdbc_connection_string => "jdbc:mongodb://root:changeme@localhost:27017/demo_db?authSource=admin"
    jdbc_user => "root"
    jdbc_password => "changeme"
    schedule => "*/30 * * * * *"
    statement => "db.documents_demo.find()"
  }
}


filter {


	mutate {
			copy => { "[document][start][in][coordinates][0]" => "[start_position][lat]" }
			copy => { "[document][start][in][coordinates][1]" => "[start_position][lon]" }
			copy => { "[document][end][in][coordinates][0]" => "[end_position][lat]" }
			copy => { "[document][end][in][coordinates][1]" => "[end_position][lon]" }
	}
	
	mutate {
        convert => {
          "[start_position][lat]" => "float"
          "[start_position][lon]" => "float"
          "[end_position][lat]" => "float"
          "[end_position][lon]" => "float"
        }
    }
}



output {
  stdout {
    codec => rubydebug
  }
}

Le output ressemble à ca

{
    "start_position" => {
        "lat" => 10.05514,
        "lon" => 33.909508
    },
      "end_position" => {
        "lat" => 10.090872,
        "lon" => 33.863937
    },
          "document" => {
               "driver_id" => "",
                "ended_at" => "2019-03-04T18:45:48.000+0000",
                   "start" => {
                 "in" => {
                       "type" => "Point",
                       "name" => "à 3,648 de Ghannouch",
                "coordinates" => [
                    [0] 10.05514,
                    [1] 33.909508
                ]
            },
            "address" => {
                "zipcode" => "6031",
                "country" => "Tunisie",
                   "city" => "Ghanouch",
                 "street" => "Route Nationale 1",
                   "name" => "????? ?????"
            }
        },
        "fuel_consumption" => 1230,
                     "end" => {
            "address" => {
                "zipcode" => "6033",
                "country" => "Tunisie",
                   "city" => "Gabes Sud",
                 "street" => nil,
                   "name" => "Sidi Boulbaba"
            },
                "_id" => "5c7d7270623e0c7d742eeca3",
               "stop" => {
                       "duration" => 27840,
                         "idling" => [],
                       "ended_at" => "2019-03-05T02:30:32.000+0000",
                "idling_duration" => 450,
                     "started_at" => "2019-03-04T18:45:48.000+0000"
            },
                 "in" => {
                       "type" => "Point",
                       "name" => "à 2,299 de Gabès",
                "coordinates" => [
                    [0] 10.090872,
                    [1] 33.863937
                ]
            }
        },
                "fleet_id" => "",
                     "vrn" => "Scania 9139TU152",
              "updated_at" => "2020-11-26T13:21:04.668+0000",
                     "_id" => "5c7d4c41623e0c159c6717d4",
              "vehicle_id" => "213",
                "distance" => 9.3,
              "started_at" => "2019-03-04T18:20:27.000+0000",
           "trip_duration" => 1521,
         "organization_id" => "15",
             "driver_name" => "",
              "avgConsump" => nil
    },
          "@version" => "1",
        "@timestamp" => 2020-12-21T21:58:03.342Z
}

Merci Yassine pour ta précieuse aide.

Si je comprends bien le mapping sera créé automatiquement par ES ?

Dans l'output de la config, je dois spécifier :

elasticsearch {
                action => "index"
                index => "mongo_trips_data"
                hosts => ["localhost:9200"]
        }

Quelle est la bonne pratique pour le mapping ?
Faut-il créer à l'avance son mapping ou laisse ES créer le mapping avec logstash en forçant le mapping pour le geo_point ?

Le mapping doit etre cree a L'avance

J'ai essayé de créer un template mapping comme suit :
est-ce la bonne manière ?

{
	"mappings": {
		"trips": {
			"properties": {
				"distance": {
					"type": "float"
				},
				"driver_id": {
					"type": "text",
					"fields": {
						"keyword": {
							"type": "keyword",
							"ignore_above": 256
						}
					}
				},
				"driver_name": {
					"type": "text",
					"fields": {
						"keyword": {
							"type": "keyword",
							"ignore_above": 256
						}
					}
				},
				"end": {
					"properties": {
						"address": {
							"properties": {
								"city": {
									"type": "text",
									"fields": {
										"keyword": {
											"type": "keyword",
											"ignore_above": 256
										}
									}
								},
								"country": {
									"type": "text",
									"fields": {
										"keyword": {
											"type": "keyword",
											"ignore_above": 256
										}
									}
								},
								"name": {
									"type": "text",
									"fields": {
										"keyword": {
											"type": "keyword",
											"ignore_above": 256
										}
									}
								},
								"zipcode": {
									"type": "text",
									"fields": {
										"keyword": {
											"type": "keyword",
											"ignore_above": 256
										}
									}
								}
							}
						},
						"in": {
							"properties": {
								"coordinates": {
									"type": "geo_point"
								},
								"name": {
									"type": "text",
									"fields": {
										"keyword": {
											"type": "keyword",
											"ignore_above": 256
										}
									}
								}
							}
						},
						"place_name": {
							"type": "text",
							"fields": {
								"keyword": {
									"type": "keyword",
									"ignore_above": 256
								}
							}
						},
						"stop": {
							"properties": {
								"duration": {
									"type": "long"
								},
								"ended_at": {
									"type": "date"
								},
								"idling": {
									"type": "long"
								},
								"idling_duration": {
									"type": "long"
								},
								"started_at": {
									"type": "date"
								}
							}
						}
					}
				},
				"ended_at": {
					"type": "date"
				},
				"fleet_id": {
					"type": "text",
					"fields": {
						"keyword": {
							"type": "keyword",
							"ignore_above": 256
						}
					}
				},
				"organization_id": {
					"type": "text",
					"fields": {
						"keyword": {
							"type": "keyword",
							"ignore_above": 256
						}
					}
				},
				"start": {
					"properties": {
						"address": {
							"properties": {
								"city": {
									"type": "text",
									"fields": {
										"keyword": {
											"type": "keyword",
											"ignore_above": 256
										}
									}
								},
								"country": {
									"type": "text",
									"fields": {
										"keyword": {
											"type": "keyword",
											"ignore_above": 256
										}
									}
								},
								"name": {
									"type": "text",
									"fields": {
										"keyword": {
											"type": "keyword",
											"ignore_above": 256
										}
									}
								},
								"street": {
									"type": "text",
									"fields": {
										"keyword": {
											"type": "keyword",
											"ignore_above": 256
										}
									}
								},
								"zipcode": {
									"type": "text",
									"fields": {
										"keyword": {
											"type": "keyword",
											"ignore_above": 256
										}
									}
								}
							}
						},
						"in": {
							"properties": {
								"coordinates": {
									"type": "geo_point"
								},
								"name": {
									"type": "text",
									"fields": {
										"keyword": {
											"type": "keyword",
											"ignore_above": 256
										}
									}
								}
							}
						},
						"place_id": {
							"type": "text",
							"fields": {
								"keyword": {
									"type": "keyword",
									"ignore_above": 256
								}
							}
						},
						"place_name": {
							"type": "text",
							"fields": {
								"keyword": {
									"type": "keyword",
									"ignore_above": 256
								}
							}
						},
						"stop": {
							"properties": {
								"duration": {
									"type": "long"
								},
								"ended_at": {
									"type": "date"
								},
								"idling": {
									"type": "long"
								},
								"idling_duration": {
									"type": "long"
								},
								"started_at": {
									"type": "date"
								}
							}
						}
					}
				},
				"started_at": {
					"type": "date"
				},
				"trip_duration": {
					"type": "long"
				},
				"updated_at": {
					"type": "date"
				},
				"vehicle_id": {
					"type": "text",
					"fields": {
						"keyword": {
							"type": "keyword",
							"ignore_above": 256
						}
					}
				},
				"vrn": {
					"type": "text",
					"fields": {
						"keyword": {
							"type": "keyword",
							"ignore_above": 256
						}
					}
				}
			}
		}
	}
}

Je pense que ca doit marcher
Il y a 3 ou 4 manières de tranformer vers geo_point

Petite question. Est-ce que dans

        "coordinates" : [
            10.05514, 
            33.909508
        ]

10.05514 est la longitude ? Et 33.909508 la latitude ?

Si c'est le cas, je pense que sans transformation, tu peux juste dire que les champs suivants sont des geo_point:

  • start.in.coordinates
  • end.in.coordinates

A moins que j'ai mal lu ce thread.... :slight_smile:

Bonjour,

Oui c'est bien ça : 10.05514 est la longitude Et 33.909508 la latitude.

quelle serait la syntaxe pour ca ?

Salut Yassine,

j'ai fait la config suivante et j'obtiens une exception

input {
  jdbc {
    jdbc_driver_library => "D:\03_ES\drivers\MongoDbJdbcDriver\mongojdbc3.0.jar"
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    jdbc_connection_string => "jdbc:mongodb://localhost:27017/atseeone?authSource=admin"
	jdbc_user => ""
    jdbc_password => ""
    schedule => "*/30 * * * * *"
    statement => "db.trips.find({},{'_id': false});"
  }
}
[2020-12-22T13:34:39,539][INFO ][logstash.inputs.jdbc     ][main][f75ea00df924555658db3fa01cdcec39233fa0bc8cd4b27ce19275467c410b3b] (6.061727s) db.trips.find({},{'_id': false});
[2020-12-22T13:34:39,991][WARN ][logstash.inputs.jdbc     ][main][f75ea00df924555658db3fa01cdcec39233fa0bc8cd4b27ce19275467c410b3b] Exception when executing JDBC query {:exception=>"Java::OrgLogstash::MissingConverterException: Missing Converter handling for full class name=org.bson.types.ObjectId, simple name=ObjectId"}
[2020-12-22T13:34:40,199][INFO ][org.mongodb.driver.connection][main][f75ea00df924555658db3fa01cdcec39233fa0bc8cd4b27ce19275467c410b3b] Closed connection [connectionId{localValue:2, serverValue:110}] to localhost:27017 because the pool has been closed.

Quelle version de logstash ?
Tu dois avoir l'erreur avec ce statement

statement => "db.trips.find({});"

Mais pas avec ce dernier

statement => "db.trips.find({},{'_id': false});"

J'utilise la version 7.9.2
voici la config complète :

input {
  jdbc {
    jdbc_driver_library => "D:\03_ES\drivers\MongoDbJdbcDriver\mongojdbc3.0.jar"
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    jdbc_connection_string => "jdbc:mongodb://localhost:27017/atseeone?authSource=admin"
	jdbc_user => ""
    jdbc_password => ""
    schedule => "*/30 * * * * *"
    statement => "db.trips.find({},{'_id': false});"
  }
}
filter {
}
output {

	elasticsearch {
                action => "index"
                index => "trip"
                hosts => ["localhost:9200"]
        }
	stdout {
		codec => rubydebug
  }
} 

J'ai encore l'exception :frowning: