MongoDB to Elasticsearch?

Is there a way to index data from mongoDB to elasticsearch? I've searched a bit but I haven't found any mongodb input on logstash
i.e. part of the mongoDB collection by making an aggregation query and then storing the result in elasticsearch ?.

Hi @stephane_chan,

There is a jdbc_input plugin that would help you ingest data from a database using MongoDB JDBC drivers. Have you looked to see if this will work for you?

Hi @carly.richmond,

I've already looked at this, but I couldn't connect to the mongodb base I took the jdbc for mongodb (Maven Central: org.mongodb:mongodb-jdbc:2.0.2) but it didn't work.

Here's my logstash configuration:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mongodb-jdbc-2.0.2.jar"
    jdbc_driver_class => "com.mongodb.jdbc.MongoDriver"
    jdbc_connection_string => "jdbc:mongodb://<host>:27017/<db_name>"
    jdbc_user => "xxx"
    jdbc_password => "xxx"
    statement => "
    db.comments.aggregate([
      { $match: { 
          media_type: 'audio', 
          is_active: 1 
      } }, 
      { $group: { 
        _id: '$media_id', 
        count: { $sum: 1 } 
      } }
    ])
    "
  }
}


filter {
  mutate {
    remove_field => ["@timestamp"]
    rename => {
      "count" => "comment_count"
      "media_id" => "id"
    }
  }
  ruby {
    code => "event.set('comment_count', event.get('comment_count') || 0)"
  }
}

output {
   elasticsearch {
        hosts => ["localhost:9200"]
        index => "test"
        document_id => "%{id}"
        action => "update"
        doc_as_upsert => true
        script => 'ctx._source.comment_count = params["event"]["comment_count"]'

    }
    stdout { }
}

and i get this error:

[INFO ] 2023-06-23 12:14:47.416 [[main]-pipeline-manager] javapipeline - Pipeline Java execution initialization time {"seconds"=>0.7}
[INFO ] 2023-06-23 12:14:47.536 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
[INFO ] 2023-06-23 12:14:47.596 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
warning: thread "[main]<jdbc" terminated with exception (report_on_exception is true):
java.lang.NoClassDefFoundError: org/aspectj/lang/Signature
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(java/lang/Class.java:398)
	at org.jruby.javasupport.JavaSupportImpl.loadJavaClass(org/jruby/javasupport/JavaSupportImpl.java:157)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:62)
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)
	at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:426)
	at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:293)
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.plugin_mixins.jdbc.common.load_jdbc_driver_class(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/plugin_mixins/jdbc/common.rb:82)
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.plugin_mixins.jdbc.common.load_driver(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/plugin_mixins/jdbc/common.rb:36)
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.inputs.jdbc.run(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/inputs/jdbc.rb:320)
	at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.inputworker(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:410)
	at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_input(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:401)
	at org.jruby.RubyProc.call(org/jruby/RubyProc.java:318)
	at java.lang.Thread.run(java/lang/Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.aspectj.lang.Signature
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:398)
	at org.jruby.javasupport.JavaSupportImpl.loadJavaClass(JavaSupportImpl.java:157)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:426)
	at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:293)
	at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:24)
	at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:86)
	at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.plugin_mixins.jdbc.common.RUBY$method$load_jdbc_driver_class$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/plugin_mixins/jdbc/common.rb:82)
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.plugin_mixins.jdbc.common.RUBY$method$load_jdbc_driver_class$0$__VARARGS__(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/plugin_mixins/jdbc/common.rb:77)
	at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80)
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)
	at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)
	at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_integration_minus_jdbc_minus_5_dot_2_dot_6.lib.logstash.plugin_mixins.jdbc.common.RUBY$method$load_driver$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.2.6/lib/logstash/plugin_mixins/jdbc/common.rb:36)

Ok, the NoClassDefFound can sometimes be caused by a driver version mismatch or an invalid path, so I would double check the version and folder location as per this thread.

Otherwise, depending on the version of Elasticsearch you are looking to ingest into I can think of a couple of other options:

  1. There is a beta of the MongoDB client connector that can be used with Python available as of 8.4 that you could leverage which is available on the free tier.
  2. Do you have a license for Elastic at all and running 8.5 or later? Another option would be to take a look at the MongoDB native connector, but it is a feature available for platinum license and above.
  3. Exec input plugin which is discussed in this old thread.

Do let us know how you get on!

I found another jdbc for mongo (UnityJDBC - JDBC Driver for MongoDB) , which uses SQL queries, with a jdbc input. I'm currently using version 7.17.4 ELK
and I've manually run logstash (logstash -f file.conf) which works.

But I think there's a limit on the number of jdbc connections in free mode.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.