How to update a index based on fields other than the document_id using logstash?


(Rajkumar Selvakumar) #1

Here is my use case,

I have created a elastic index ( name : order_index) and I able to load the index with the inputs from my database through logstash jdbc plugin ( all is fine ). the index looks like this seq_id (database sequence), order id, sender , receiver,proessed time.

doucment_id is seq_id ..
order id + sender + receiver is unique ( once the order is received the receivers are determined based on a routing logic), so there are more than one record for each order_id.

Now I have a CSV file, which has order ID and request Time.

I need to read the CSV ( i use logstash csv plugin ) and upsert in to the order_index.

problem:

CSV file has only order Id ( no sender or receiver) and request date.

I need to add the request date in order_index based on order id.

order_index cannot have order_id as document_id because order_id is unique.

Blockquote

input {
beats {
port => "5044"
}
}

filter {

csv {
separator => ","
columns => ["order_id","request_time"]
}

}
output {

elasticsearch {
hosts => [ "localhost:9200" ]
index => [ "order_index" ]
}

Blockquote


(Guy Boertje) #2

Where does the CSV file come from? A database?
If not, how often is new data added to the file?

I think you could use a SQLite DB, import the CSV and use the jdbc_streaming filter to add the request date to all documents with the same order_id.

What analysis are you intending to do in Kibana?


(Rajkumar Selvakumar) #3

Thanks for your response.

The CSV is generated from jmeter ( lod run tests).

The CSV file will be generated after the end of test run. So, its not frequently updated.

importing the CSV to the database and then reading it through jdbc input plugin would defenitely work. thats an option which I have.

However, I am trying to find a solution where in which I am trying to by-pass the database,
If logstash can provide an option to update the existing documents in the index based on certain fields ( similar to the search query), then I can use that, instead of load the CSV into database and then stream it to Elastic.


(Guy Boertje) #4

Maybe scripted updates in the elasticsearch output might work for you?

This would be in a second LS pipeline that reads the CSV file. Be aware that the document you are trying to update must be in ES already. You may need to use the elasticsearch filter to add the sequence_id of the first doc matching against order_id. Alternatively you could try to add an array of sequence_id of all matching docs by order_id and then split the array into multiple docs.


(Rajkumar Selvakumar) #5

Hi ,

Have been busy on other stuff. I have started learning scripts in elastic. Let me try to create scripts and will test whether it helps to resolve my problem. I will share the updated logstash config once done.

Thanks,
Raj


(Rajkumar Selvakumar) #6

Have found a solution but fews issues have to be sorted out.Since I can read the CSV file , i extract the order id , sender & receiver.
Use ruby flter plugin to query the index based on order id , sender & receiver to get the document id.Once the document ID is available , i can use the regular upsert function as below.

I am sharing my configuration , please review and assist

output {
elasticsearch {

hosts => ["host1:9200", "host2:9200","host3:9200"]
document_id => "%{my_doc_id}"
index => "My_Index"
doc_as_upsert => true
action => "update"
manage_template => true
}
}

My ruby filter plugin:

input {
beats {
port => "5044"
}
}

filter {
csv {
separator => ","
columns => ["publish_time","order_id","sender","receiver"]
}
mutate {
remove_field => [ "source", "message" , "beat", "host"]
}
ruby {
code => "
require 'elasticsearch'
client = Elasticsearch::Client.new hosts: ["host1:9200", "host2:9200","host3:9200"]
response = client.search index: 'My_Index', body: { query: { match_all:
{ 'order_id': event.get('correlation_id'),
'sender': event.get('sender'),
'receiver': event.get('receiver')
}
}
}
event.set('my_doc_id', response ['hits']['hits'][0]['_source']['_id'])

  "

}
}

problem: the query returns numerous documents instead of one because , order_id , sender and receiver are treated as text. In kibana, i can get unique results if I use .keyword ( example order_id.keyword ) , but when I use it inside ruby plugin , i encounter the below error.

SyntaxError: (ruby filter code):4: syntax error, unexpected ':'
response = client.search index: 'perf_report_by_audit_and_jmeter', body: { query: { match: { 'order_id.keyword': event.get(order_id') } } }
^
eval at org/jruby/RubyKernel.java:1079
register at /bpms/ELK/logstash-5.6.3/vendor/bundle/jruby/1.9/gems/logstash-filter-ruby-3.0.4/lib/logstash/filters/ruby.rb:38
register at /bpms/ELK/logstash-5.6.3/vendor/jruby/lib/ruby/1.9/forwardable.rb:201
register_plugin at /bpms/ELK/logstash-5.6.3/logstash-core/lib/logstash/pipeline.rb:290
register_plugins at /bpms/ELK/logstash-5.6.3/logstash-core/lib/logstash/pipeline.rb:301
each at org/jruby/RubyArray.java:1613
register_plugins at /bpms/ELK/logstash-5.6.3/logstash-core/lib/logstash/pipeline.rb:301
start_workers at /bpms/ELK/logstash-5.6.3/logstash-core/lib/logstash/pipeline.rb:311
run at /bpms/ELK/logstash-5.6.3/logstash-core/lib/logstash/pipeline.rb:235
start_pipeline at /bpms/ELK/logstash-5.6.3/logstash-core/lib/logstash/agent.rb:398


(Rajkumar Selvakumar) #7

I have resolved the issue.

Used Ruby code with ElasticSearch gem to address the issue.


(system) #8

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.