Logstash create doc if not exist else append array


#1

Elasticsearch: 6.4.2
Logstash: 6.4.2

Goal: Index the data if it hasn't been indexed before, otherwise, if the document does exist, then append an array.

Input data round 1:

{ "item": ["1"] }

Input data round 2:

{ "item": ["2"] }

Desired data in Elasticsearch:

{ "item": ["1","2"] }

The below will index the first time the document is seen or append "item" if it has not. However, it will always append even if the value is in the array. e.g. if the below is ran three times in a row, the output would be { "item": ["1","1","1"] }

POST /test/doc/20/_update?pretty
{
  "script": {
      "lang": "painless",
      "source": "ctx._source.item.add(params.itemNew)",
      "params": {"itemNew": "1"}
    },
    "upsert": {
        "item" : ["1"]
}

With the addition of logic, it first checks if the value is in "item" and if it is not, then append it. But before it even does that check, it will index it if it does not exist.

POST /test/doc/30/_update?pretty
{
	"script": {
		"lang": "painless",
		"source": "if(ctx._source.item.contains(params.itemNew)) ctx.op = 'noop'; else (ctx._source.item.add(params.itemNew))",
                "params": {"itemNew": "1"}
	},
	"upsert": {
		"item" : ["1"]
	}
}

I'm trying to take either of the above examples and convert them to Logstash. The best I have been able to figure out is duplicating my first example where it appends to the item array regardless if the value is already present. The problem is that this just concatenates the strings together. So if the document already exists as item "1" and I update it with item "2" it becomes item "12."

output {
  elasticsearch {
   hosts => ["http://localhost:9200"]
   doc_as_upsert => true
   script => 'ctx._source.item += "%{[item]}"'
   action => "update"

Update:
This works to append an array (as my first example was showing). It is an append with no logic to determine if the value is already present. i.e. will produce { "item": ["1","1","1"] }

output {
    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "test20"
        doc_as_upsert => true
        script => 'ctx._source.timestamp.add("%{[item]}")'
        action => "update"
        retry_on_conflict=>3
}

#2

This is what I came up with. Note that you must create your own fingerprint value so that you can control the document id and know that when you do the comparison to determine if "item" should be appended, that you are doing it against what you think you're doing it against.

output {
    elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "test7"
    document_id => "%{[@metadata][fingerprint]}"
    doc_as_upsert => true
    script => 'if(ctx._source.item.contains("%{[item]}")) return true; else (ctx._source.item.add("%{[item]}"))'
    action => "update"
    retry_on_conflict=>3

It does appear to have to compile a whole forcing me to have to change script.max_compilations_rate to a crazy value like "90000/1m". I was not able to figure out how to use params (what appears to be recommended to reduce script compiles) even when I put in params.event.get("item")) into the script.

PUT _cluster/settings
{
 "transient" : {
    "script.max_compilations_rate" : "90000/1m"
   }
 }

The above is likely causing my current issue of running out of memory... if anyone knows how to use params in the above I'd appreciate it.


#3

I discovered that using the below causes a WHOLE LOT of script compilations. I set script.max_compilations_rate to a very high number to get around it. This eventually caused errors that it took me a while to figure out.

The error was 'java.lang.OutOfMemoryError: Compressed class space' and most likely my issue was due to the unique compiling of too many scripts and a memory leak. This is somewhat described here: https://github.com/elastic/elasticsearch/issues/19396

I changed to use the code below which has extrememly low compilations and I don't have the memory issue anymore.

script => 'if(ctx._source.item.contains(params.event.get("item")[0])) return true; else (ctx._source.item.add(params.event.get("item")[0]))'

p.s. thanks for the help from the community ....:expressionless:


(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.