How to add to a Set collection in Logstash Elasticsearch output plugin?

Hi,

I am trying to add a field to an array (or collection) in elasticsearch and preferably delete a field from logstash event map using elasticsearch output plugin. How can I accomplish it?

P.S. For my situation I cannot use aggregate filter as I have multiple elasticsearch output plugin. So the best way to populate the array is may be using the script (either in the ingest node pipeline or through logstash elasticsearch script).

e.g., consider that below are the inputs-

{
model_id: 1,
schematic_id: 101,
description: "test1"
},
{
model_id: 2,
schematic_id: 101,
description: "test1"
},
{
model_id: 2,
schematic_id: 102,
description: "test2"
},
{
model_id: 3,
schematic_id: 101,
description: "test1"
}

Goal is to update both model and schematic index populating the array of the other field.

Below is one of the sample code that I tried out of many. I am unable to find the correct syntax and the correct way of doing it.

elasticsearch {
        hosts => ["localhost:9200"]
        index => "schematics_test"
        document_id => "%{schematic_id}"
        
        # Note: doc_as_upsert is mutually exclusive of scripted_upsert
        # doc_as_upsert => true
        # action => "update"
        
        script_lang => "painless"
        script_type => "inline"
        scripted_upsert => true

        # script => "ctx._source.models.add(%{model_id})"
        # script => "ctx._source.models.add(params.event.get('model_id'))"

        parameters => {"modelId" => "%{model_id}"}
        script => "
                    tmp = new HashSet();
                    tmp.addAll(ctx._source.models);
                    tmp.addAll(params.modelId);
                    ctx._source.models = tmp.toArray();
                "
    }

Ideal output:

 {
    schematic_id: 101,
    description: "test1",
    models: [1, 2, 3]
 },
 {
    schematic_id: 102,
    description: "test2",
    models: [2]
 }

Aggregate filter would definitely work but only for schematic index and I will loose the ability to update the model index which also contains an array of schematics.

Please help!

Just for reference, below was the aggregate filter, which I am no longer using-

# aggregate {
#     task_id => "%{schematic_page_id}"
#     code => "  
#         map['schematic_id'] = event.get('schematic_id')
#         map['description'] = event.get('description')
        
#         map['models'] ||= []
#             map['models'] << {                        
#             'model_id' => event.get('model_id')                        
#         }               
#         event.cancel()
#     "
#     push_previous_map_as_event => true
#     timeout => 15
# }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.