Ruby filter - How to access some elastic index data from a ruby filter

I'm trying to let logstash add a field based in the calculation of some incoming event and data that I have in another index or external parameter.

How can I access to data from any elasticsearch index in the ruby code?

This is what I'm trying to do:

ruby {
    code => "event.set('new_field', event.get('score').to_i * another_index.config.rate"

Where 'another_index.config.rate' should be** data from a elastic index from outside the scope of the logstash import.

I actually tried by performing a GET http request to elasticsearch from the ruby code and it seems to work, but I feel this way is not right if it's actually making a GET request for every one of the millions records that logstash is importing... this is the code i'm using:

uri = URI.parse('http://localhost:9200/test/config/1')
           response = Net::HTTP.get_response(uri)
           if response.code == '200'
             result = JSON.parse(response.body)
             rate = result['_source']['rate']
             event.set('new_field', event.get('Installs').to_i * rate.to_i)

             event.set('new_field', '0')

What's the right way to achieve this?

Thanks in advance

This is what the elasticsearch filter plugin does, but this adds a lot of overhead and reduces throughput as you correctly point out. That will be the case even if you do it through Ruby code.

If you have a limited set of data you are looking up that does not change frequently, you could put it in a file and use the translate filter plugin which stores the data in memory and therefore have a significantly smaller effect on throughput.

Might the jdbc_streaming filter help someway to reuse the connection and make it better?
It must be an efficient way of doing this as It looks to me as a very common scenario.

Thanks a lot

If you have the data in a database, the jdbc streaming plugin is certainly an option.

About the translate filter recommendation, could work but I need actually data that's coming from database.. I could eventually consume that data from mongodb instead of elastic if that helps somehow.

And last... in the case that there is no way rather than my code above... the 2nd question is, what happens when logstash bombs elastic so much so many times? how does elastic react to that?

Issuing a lot of queries will put extra load on the cluster, so I would avoid this for streams with high throughput rates.

For this to be efficient you need a filter that can cache client-side, and unfortunately I do not think the Elasticsearch filter does that yet. There is an open issue, which had some comments not too far back. Maybe @guyboertje can provide some further details?

You really don't want to make a http call to elasticsearch in the ruby filter. You will need to handle failures etc.

If your data is (or can be) in a JDBC accessible database then JDBC Streaming is your best bet.

Another option is to run a second LS pipeline that takes the data out of ES with an ES input plugin and writes it to a KV or JSON file that is used by the translate filter in the first LS pipeline.

This is what u mean in ur last comment?

I think this could be very error prone. As it is not possible to control when the translate plugin will refresh, it could end up refreshing halfway through the file being written. I would probably prefer having a script prepare the file and then just replace it when complete.

I hear u,.. so it looks to me so far that the JDBC Streaming option is the safer one, isn't it?

Actually, I don't think the second pipeline idea works as the file will not be written once at the end of the ES input data collection run.

Wait, i'm confused, so the Elasticsearch filter plugin isn't a good fit for this?


Does it make a request for every record imported?

I'm actually having this issue:

"Error: java::mongodb.jdbc.MongoDriver not loaded"

And couldn't yet find any solution.

This is my config:

    jdbc_streaming {
    jdbc_driver_library => "/Applications/UnityJDBC/mongodb_unityjdbc_full.jar"
    jdbc_driver_class => "java::mongodb.jdbc.MongoDriver"
    jdbc_connection_string => "jdbc:mongodb://localhost:27018/mydb"
    statement => "select value from app_values where setting = 'rate'"
    target => "rate"
    add_field => {"yeah" => "This is my rate: %{rate}"}

Any direction?

jdbc_driver_class => "java::mongodb.jdbc.MongoDriver" is wrong.

Have a look at

You're right, however i've tried also what they wrote in theirs documentation.

jdbc_connection_string => "jdbc:mongodb://localhost:27018/mydb"

Wasn't right, but also tried with theirs "version" and I get the same error:

Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<Sequel::AdapterNotFound: 
java::mongodb.jdbc.MongoDriver not loaded>, :backtrace=> 
["/Users/akapit/workspace/outflink/outflink_platform/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/sequel-5.7.1/lib/sequel/adapters/jdbc.rb:44:in `load_driver'", 

@Anybody here made this work?


Well, it is true that we don't have an adapter for MongoDB. AFAIR, when a specific adapter is not found a generic one is used. You may have trouble with the generic adapter because it does vanilla SQL only.

Is this what you are trying?

    jdbc_driver_class => "mongodb.jdbc.MongoDriver"
    jdbc_connection_string => "jdbc:mongo://localhost:27018/mydb"

The above is from the link I gave.

Yes, that's what i'm trying

Post your config please