Update nested object using script with params in Logstash Elasticsearch output

Hello everyone,

I would like to update a document that contains a nested object. I've already seen that sounds possible using script. However, it appears that we can't use params in Logstash ...

My mapping for the concerned Type is :

Array_sensorsMaterialsTable": {
    "dynamic": "strict",
    "properties": {
        "@timestamp": {
            "type": "date"
        },
        "materials_sensorsMaterialsTable_columns": {
            "dynamic": "strict",
            "type": "nested",
            "properties": {
                "sensorMaterialsHumidity": {
                    "type": "integer"
                },
                "sensorMaterialsTemperature": {
                    "type": "integer"
                },
                "sensorMaterialsId": {
                    "type": "integer"
                }
            }
        },
        "host": {
            "type": "ip"
        },
        "array_name": {
            "index": "analyzed",
            "type": "string"
        }
    }
}

I want to use Logstash in order to :

  • Create the document if not exists
  • Update the document if exists by updating nested object

And my logstash configuration is :

elasticsearch {
            action => "update"
            index => "index"
            document_type => "type"
            document_id => "id"
            doc_as_upsert => true
        }

Thank you for your help

1 Like

In the case that there is no response, can someone told me what is the best way to store an array with columns in Elasticsearch using Logstash if we recover if entry of an array one by one ?

@LetMeR00t got the same situation. we can't use params in Logstash.... did you work out it? and how did you do?

1 Like

Hello CharlesLdy,

Because I used arrays in my project, I thought about using nested objects with a script.
However, it seems that nested objects are not well supported by Kibana so I change my conception.
So now if I have an "Array1" type in my index "index1", each entry of my "Array1" would be each entry of my array (input data).

Anyway, I found that you can update an Elasticsearch data in Logstash using his "_id" (that you can customize if you want to retrieve some data !). Be careful about using a unique "_id" per data.
So, you can imagine to have an "Array1" type in your index "index1" with three columns "C1", "C2" and "C3".
First, you put your new data (with an id that you build an can retrieve easily), then you can use the "update" function using the id that you build to update your array if some new data are coming in.

Well, if you can change the format of your data, it could be a great idea to send all your data once :slight_smile:

Ask me for few questions if i'm not clear :slight_smile:

thanks for your quickly reply. sorry, my english is bad, so I'm not understand what did you mean exactly.

In my case one doc maybe updated 3 times with different new fileds or same fields. the updated same fields were in array, the new value is just append to the array. and so I must have a unique "id" per doc in es.

before I set the script type to file I using the dynamic script, so I can read the data by '%{filed}'. But it seems causes the memory leak. now I changed the script type to file, and I need to send the fileds by logstash's elasticsearch output as params to the script. so I found the discuss, but it seems that there is no way to do it.

I can show you my script code(it's just the one of my three udpate scripts.):

def orderConfirm(String orderId, double orderGmv, String orderConfirmTime, int buyCount) {
    if (ctx._source.orderConfirmNum) {
        ctx._source.orderConfirmNum += 1
    } else {
        ctx._source.orderConfirmNum = 1
    }

    if (ctx._source.orderConfirmTimes) {
        ctx._source.orderConfirmTimes += orderConfirmTime
    } else {
        if (ctx._source.orderConfirmTime) {
            ctx._source.orderConfirmTimes = [ctx._source.orderConfirmTime, orderConfirmTime]
        } else {
            ctx._source.orderConfirmTimes = [orderConfirmTime]
        }
    }

    if (ctx._source.orderIds) {
        ctx._source.orderIds += orderId
    } else {
        if (ctx._source.orderId) {
            ctx._source.orderIds = [ctx._source.orderId, orderId]
        } else {
            ctx._source.orderIds = [orderId]
        }
    }

    if (ctx._source.orderGmvs) {
        ctx._source.orderGmvs += orderGmv
    } else {
        if (ctx._source.orderGmv) {
            ctx._source.orderGmvs = [ctx._source.orderGmv, orderGmv]
        } else {
            ctx._source.orderGmvs = [orderGmv]
        }
    }

    if (ctx._source.buyCounts) {
        ctx._source.buyCounts += buyCount
    } else {
        if (ctx._source.buyCount) {
            ctx._source.buyCounts = [ctx._source.buyCount, buyCount]
        } else {
            ctx._source.buyCounts = [buyCount]
        }
    }

    if (!ctx._source.orderConfirm) {
        ctx._source.orderConfirm = 1
    }

    ctx._source.orderId = orderId
    ctx._source.orderGmv = orderGmv
    ctx._source.orderConfirmTime = orderConfirmTime
    ctx._source.buyCount = buyCount
}

orderConfirm(orderId, orderGmv, orderConfirmTime, buyCount)

the params with the function calling is just I need to read from the logstash.

How did your data are coming in Logstash ?
For my part, I don't use any script to update my data so I would like to know how your system works :slight_smile:

PS : Are you french by the way ? :slight_smile:

:grinning: I'm chinese. Nice to meet you.

My data is about the product and user's behaviour data. Per doc in es contains five part data.

  1. user request data,

  2. product exposure data.

  3. user click prodcut data.

  4. add to cart data.

  5. user place an order data.
    so the doc id in es is eventId+productId. when one part of the datas created in es. the others will be updated to es by doc id.
    I can show you the logstash's conf related to the script above:

    input {
    kafka {
    zk_connect => "kafka001.elk.idc:2181,kafka002.elk.idc:2181,kafka003.elk.idc:2181"
    group_id => "logstash-es"
    topic_id => "track-orderconfirm-log"
    codec => plain
    reset_beginning => false
    consumer_threads => 5
    decorate_events => true
    }
    }

    filter {
    clone {}
    json {
    source => "message"
    remove_field => ["message"]
    remove_field => ["kafka"]
    add_field => {
    "orderConfirm" => 1
    "orderConfirmNum" => 1
    }
    }
    mutate {
    convert => {
    "orderConfirm" => "integer"
    "orderConfirmNum" => "integer"
    }
    merge => {
    "orderGmvs" => "orderGmv"
    "orderConfirmTimes" => "orderConfirmTime"
    "orderIds" => "orderId"
    "buyCounts" => "buyCount"
    }
    }
    }

    output {
    stdout {
    codec => rubydebug { metadata => true}
    }

     elasticsearch {
         hosts => ["esdata001.elk.idc:9200","esdata002.elk.idc:9200","esdata003.elk.idc:9200"]
         index => "logstash-tracking-%{+YYYY.MM.dd}"
         document_type => "logmodel"
         document_id => "%{eventId}_%{productId}"
         workers => 5
         flush_size => 20000
         idle_flush_time => 10
         template_overwrite => true
         action => "update"
         doc_as_upsert => true
         script => "orderconfirm-es"
         script_lang => "groovy"
         script_type => "file"
         template => "/data/elk/logstash_conf/mapping/tracking.json"
     }
    

    }

My production is still using dynamic script ,and I got the out of memory of permgen twice. I think the dynamic script is the cause. so I want to change the script type to file.
you can see my permgen and heap size is always growing.

node1:

node2:

node3:

I thought about the source fields from the logstash maybe as params for the script automatic. But I don't know how to use it.

Why did you clone your message ?

I think that you can modify your Logstash configuration by specifying "codec => json" in kafka and so change your json filter.

I don't understand what your script is doing here, Well, if you want to modify some data, you should change them in the filter part and not in your script but perhaps I didn't understand what you want to do ?

By the way, you can use an "elasticsearch" filter for recovering some data from ES : https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html

Otherwise, you could use a ruby script filter to change your data as you want :slight_smile:

About the clone, I think you are right and I will have a try.

I used the script to do something like auto increment or modify the exist field in es. such as increment a integer field or append a value from the new event to an array field.

Otherwise, I just can coding java and python.:joy: maybe I should to study the ruby.

@CharlesLdy, @LetMeR00t, when using script for updating documents with logstash, you can use variable 'event' within your script which contains the logstash's event itself !

sample logstash event: {"@timestamp": "2016-10-13T12:19:02.000Z", "id": "1", "name" : "foo"}

elasticsearch {
    action => "update"
    document_id => "%{id}"
    doc_as_upsert => true
    script => 'ctx._source.name = event["name"]'
    script_var_name => "event"
    script_type => "dynamic" # this works with all types
}

It's the same use for a ruby filter :smiley:

@dchauviere @LetMeR00t
yes it works for me. So simple solution. I've spent two days for this. I have been wrong to understand 'script_var_name'.

thank you very mush, all of you. I'm very glad to meet you.:grin: