Logstash Update or Insert - logic for updating if existing

Hey guys,

I am having a bit of an issue attempting to apply some logic to mimic a previous mysql functionality.

The idea is that after parsing i create a unique id, Doc_id of 3 fields concatenated. Then i query elasticsearch to see if that field already exists and to pull the value. in the output i have an if statement that should compare "previousValue", from elastic, and EndpointValue, from the document, and if they are NOT the same, to go into elasticsearch output. This should cover if it doesnt exist as it will not be the same. If it is the same, it should do nothing.

Then on the elastic output i have chosen to do upsert, which from my reading should update existing or create a new record. The following is working some what, it inserts records, but not in the way that i was hoping.

I'd like to have the record updated, if existing and value is different, such that @timestamp gets updated and @version is incremented. However, what i see appears to be a new document everytime with latest @timestamp, but @version is always set to 1.

Perhaps i am overcomplicating this or missing something?

I have logs that look like:
Test-USPS|uspsUri|global|http://test.usps.com

Pattern:
NOTPIPE [^|]+

My config is:
input{
beats{
port => 5044
}
}
filter{
grok{
patterns_dir => "/opt/logstash/patterns"
match => [
"message", "%{WORD:Environment}-%{NOTPIPE:ApplicationName}|%{NOTPIPE:EndpointName}|%{NOTPIPE:ignore}|%{NOTPIPE:EndpointValue}"
]
}
mutate{
add_field => { "Doc_id" => "%{Environment}%{ApplicationName}%{EndpointName}" }
remove_field => [ "ignore" ]
}
elasticsearch{
hosts => ["localhost"]
query => "Doc_id=%{Doc_id}"
fields => { "EndpointValue" => "previousValue" }
}
}
output{
if "%{previousValue}" != [EndpointValue]{
elasticsearch {
hosts => ["localhost"]
index => "esb-endpoints"
action => "update"
doc_as_upsert => "true"
document_id => "%{Doc_id}"
}
}

so it appears to be the if statement causing an issue. i placed the following debug code to attempt to get an idea of whats happening and even though the values are equal, as seen by the json output, it still sees them as different:
if "%{EndpointValue}" != "%{previousValue}" {
file{
path => "/tmp/testOut.txt"
}

The following is logged:
{
"ApplicationName": "USPS",
"offset": 134,
"input_type": "log",
"source": "/tmp/test.log",
"message": "Test-USPS|uspsUri|global|https://www.usps.com/business/web-tools-apis",
"type": "log",
"previousValue": "https://www.usps.com/business/web-tools-apis",
"tags": ["beats_input_codec_plain_applied",
"diff"],
"EndpointValue": "https://www.usps.com/business/web-tools-apis",
"@timestamp": "2017-03-13T21:42:40.662Z",
"EndpointName": "uspsUri",
"Doc_id": "TestUSPSuspsUri",
"@version": "1",
"beat": {
"hostname": "localhost",
"name": "localhost",
"version": "5.2.2"
},
"host": "localhost",
"Environment": "Test"
}

Ok, i finally got the if statement to work as expected. i changed:

if "%{EndpointValue}" != "%{previousValue}" {

to

if [EndpointValue] != [previousValue] {

Anyone know the difference and why the latter works and the former does not?

Finallly, now the if statement is working, however the update feature into elasticsearch is still not working.

output{
if [EndpointValue] != [previousValue] {
elasticsearch {
hosts => ["localhost"]
index => "esb-endpoints"
action => "update"
doc_as_upsert => "true"
document_id => "%{Doc_id}"
}
}

my hopes are that the update would modify the field which has changed, as well a uptick the version number. Do i need to add the logic into logstash manually to pull @version and update it with a +1?

1 Like

Well I have finally gotten the functionaly I wanted, for the most part. Since i could not get update to increment the @version field as I had originally anticipated the update action would do, i resolved to creating my own field called epVersion and using ruby code to increment it.
so i grab epVersion and endpoint value from elasticsearch filter plugin. if epVersion doesn't exist (aka it wasn't already in the record) i set it to 0. i ensure epVersion is set as an integer, then if endpoint has changed we increment epVersion. Quite a workaround for what should be a simple @version increment on update, but it works.

elasticsearch{
hosts => ["localhost"]
query => 'Doc_id="%{Doc_id}"'
fields => { "EndpointValue" => "previousValue" }
fields => { "epVersion" => "epVersion" }
}
if ![epVersion]{
mutate{
add_field => { "epVersion" => 0 }
}
}
mutate{
convert => { "epVersion" => "integer" }
}
if [EndpointValue] != [previousValue] {
ruby{
code => '
event.set("epVersion2", event.get("epVersion").to_i + 1)
'
}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.