Hi,
I'm trying to let logstash add a field based in the calculation of some incoming event and data that I have in another index or external parameter.
How can I access to data from any elasticsearch index in the ruby code?
Where 'another_index.config.rate' should be** data from a elastic index from outside the scope of the logstash import.
I actually tried by performing a GET http request to elasticsearch from the ruby code and it seems to work, but I feel this way is not right if it's actually making a GET request for every one of the millions records that logstash is importing... this is the code i'm using:
uri = URI.parse('http://localhost:9200/test/config/1')
response = Net::HTTP.get_response(uri)
if response.code == '200'
result = JSON.parse(response.body)
rate = result['_source']['rate']
event.set('new_field', event.get('Installs').to_i * rate.to_i)
else
event.set('new_field', '0')
end
This is what the elasticsearch filter plugin does, but this adds a lot of overhead and reduces throughput as you correctly point out. That will be the case even if you do it through Ruby code.
If you have a limited set of data you are looking up that does not change frequently, you could put it in a file and use the translate filter plugin which stores the data in memory and therefore have a significantly smaller effect on throughput.
Might the jdbc_streaming filter help someway to reuse the connection and make it better?
It must be an efficient way of doing this as It looks to me as a very common scenario.
About the translate filter recommendation, could work but I need actually data that's coming from database.. I could eventually consume that data from mongodb instead of elastic if that helps somehow.
And last... in the case that there is no way rather than my code above... the 2nd question is, what happens when logstash bombs elastic so much so many times? how does elastic react to that?
Issuing a lot of queries will put extra load on the cluster, so I would avoid this for streams with high throughput rates.
For this to be efficient you need a filter that can cache client-side, and unfortunately I do not think the Elasticsearch filter does that yet. There is an open issue, which had some comments not too far back. Maybe @guyboertje can provide some further details?
Another option is to run a second LS pipeline that takes the data out of ES with an ES input plugin and writes it to a KV or JSON file that is used by the translate filter in the first LS pipeline.
I think this could be very error prone. As it is not possible to control when the translate plugin will refresh, it could end up refreshing halfway through the file being written. I would probably prefer having a script prepare the file and then just replace it when complete.
Wasn't right, but also tried with theirs "version" and I get the same error:
Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<Sequel::AdapterNotFound:
java::mongodb.jdbc.MongoDriver not loaded>, :backtrace=>
["/Users/akapit/workspace/outflink/outflink_platform/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/sequel-5.7.1/lib/sequel/adapters/jdbc.rb:44:in `load_driver'",
Well, it is true that we don't have an adapter for MongoDB. AFAIR, when a specific adapter is not found a generic one is used. You may have trouble with the generic adapter because it does vanilla SQL only.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.