Is there a way to index data from mongoDB to elasticsearch? I've searched a bit but I haven't found any mongodb input on logstash
i.e. part of the mongoDB collection by making an aggregation query and then storing the result in elasticsearch ?.
Hi @stephane_chan,
There is a jdbc_input
plugin that would help you ingest data from a database using MongoDB JDBC drivers. Have you looked to see if this will work for you?
Hi @carly.richmond,
I've already looked at this, but I couldn't connect to the mongodb base I took the jdbc for mongodb (Maven Central: org.mongodb:mongodb-jdbc:2.0.2) but it didn't work.
Here's my logstash configuration:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mongodb-jdbc-2.0.2.jar"
jdbc_driver_class => "com.mongodb.jdbc.MongoDriver"
jdbc_connection_string => "jdbc:mongodb://<host>:27017/<db_name>"
jdbc_user => "xxx"
jdbc_password => "xxx"
statement => "
db.comments.aggregate([
{ $match: {
media_type: 'audio',
is_active: 1
} },
{ $group: {
_id: '$media_id',
count: { $sum: 1 }
} }
])
"
}
}
filter {
mutate {
remove_field => ["@timestamp"]
rename => {
"count" => "comment_count"
"media_id" => "id"
}
}
ruby {
code => "event.set('comment_count', event.get('comment_count') || 0)"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "test"
document_id => "%{id}"
action => "update"
doc_as_upsert => true
script => 'ctx._source.comment_count = params["event"]["comment_count"]'
}
stdout { }
}
and i get this error:
[INFO ] 2023-06-23 12:14:47.416 [[main]-pipeline-manager] javapipeline - Pipeline Java execution initialization time {"seconds"=>0.7}
[INFO ] 2023-06-23 12:14:47.536 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
[INFO ] 2023-06-23 12:14:47.596 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
warning: thread "[main]<jdbc" terminated with exception (report_on_exception is true):
java.lang.NoClassDefFoundError: org/aspectj/lang/Signature
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(java/lang/Class.java:398)
at org.jruby.javasupport.JavaSupportImpl.loadJavaClass(org/jruby/javasupport/JavaSupportImpl.java:157)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)
at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:426)
at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:293)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.plugin_mixins.jdbc.common.load_jdbc_driver_class(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/plugin_mixins/jdbc/common.rb:82)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.plugin_mixins.jdbc.common.load_driver(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/plugin_mixins/jdbc/common.rb:36)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.inputs.jdbc.run(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/inputs/jdbc.rb:320)
at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.inputworker(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:410)
at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_input(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:401)
at org.jruby.RubyProc.call(org/jruby/RubyProc.java:318)
at java.lang.Thread.run(java/lang/Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.aspectj.lang.Signature
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.jruby.javasupport.JavaSupportImpl.loadJavaClass(JavaSupportImpl.java:157)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:426)
at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:293)
at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:24)
at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:86)
at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.plugin_mixins.jdbc.common.RUBY$method$load_jdbc_driver_class$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/plugin_mixins/jdbc/common.rb:82)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.plugin_mixins.jdbc.common.RUBY$method$load_jdbc_driver_class$0$__VARARGS__(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/plugin_mixins/jdbc/common.rb:77)
at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80)
at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)
at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.plugin_mixins.jdbc.common.RUBY$method$load_driver$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/plugin_mixins/jdbc/common.rb:36)
Ok, the NoClassDefFound
can sometimes be caused by a driver version mismatch or an invalid path, so I would double check the version and folder location as per this thread.
Otherwise, depending on the version of Elasticsearch you are looking to ingest into I can think of a couple of other options:
- There is a beta of the MongoDB client connector that can be used with Python available as of 8.4 that you could leverage which is available on the free tier.
- Do you have a license for Elastic at all and running 8.5 or later? Another option would be to take a look at the MongoDB native connector, but it is a feature available for platinum license and above.
- Exec input plugin which is discussed in this old thread.
Do let us know how you get on!
I found another jdbc for mongo (UnityJDBC - JDBC Driver for MongoDB) , which uses SQL queries, with a jdbc input. I'm currently using version 7.17.4 ELK
and I've manually run logstash (logstash -f file.conf) which works.
But I think there's a limit on the number of jdbc connections in free mode.
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.