Event.remove method not working inside aggregate section in code block

Hi All,

I am newbie to ELK stack, I am trying to remove the field called "attributes" while aggregate the data inside code block. But it is not removing the already existing "attributes" in the corresponding "id" but only update the existing attributes.

The actual expected result is I have to remove "attributes" if already available and create new field with same name "attributes", with selected results from SQL.

Below is the code I tried so far:

filter {
       if "my_index" in [tags] {
			
			aggregate {
				task_id => "%{employee.id}"
				code => " 
					event.remove('attributes')
					map['id'] = event.get('employee.id')
					map['attributes'] ||= {}
                attributename = event.get('attributename')
                attributevalue = event.get('attributevalue')
                map['attributes'][attributename] = attributevalue
				event.cancel()
				"
				push_previous_map_as_event => true
				timeout => 3
			}
			
			
			
			mutate {
				add_field => { "custom_index_name" => "my_index" }
			}
    } else {
        mutate {
            add_field => { "custom_index_name" => "%{type}" }
        }
    }
}

This will remove the [attributes] field from the event that is being aggregated, but you then call event.cancel to delete that event, so it doesn't really matter. The event that is flushed from the map will have the [attributes] field your code aggregated into the map.

I tried it by removing event.cancel() but still I can't delete attributes existing already and create newly using map['attributes']. Could you please suggest how to change it in my code?
Thanks.

I do not understand what you mean by this. The aggregate filter will create a new event containing whatever is in the map. That means it only has the fields that you have added to the map in your aggregate filter.

Yeah Thanks! I got it, this aggregated data will be saved and to prevent other event to access it.
If so, how to update the map['attributes'] with newly fetched values. Suppose I've below already inside attributes:

attributes: { A: 1, B: 2, C:3, D:4}
attributes comes from SQL: {A: 2, B:5 C: 1}

In above code A, B, C is updating properly. But D:4 is still there it's not able to delete. So I am trying to remove attributes entirely and create a new attributes field with values comes from SQL.
Can you please help me on this?

D:4 will only be there is there is an event for that employee.id that contains

"attributename" => "D"
"attributevalue" => "4"

Thanks for clarification, in this case how we can replace existing attributes completely by new attributes from SQL? I mean how to replace above attributes A,B,C,D (existing) with A,B,C (from SQL)

I am still confused, but just had another idea. Are you saying that you have an existing document in elasticsearch where the [attributes] field is "{ A: 1, B: 2, C:3, D:4}", and if you fetch a new row from the jdbc input that has "{A: 2, B:5 C: 1}" you want to replace the [attributes] field on the document with whatever comes out of logstash? If so, aggregate is not the way to go. Do you need to overwrite the entire document, or just replace the [attributes] field?

You will probably need to ask a new question to get an answer. If you answer the questions above I can help you tomorrow with what to ask in the new question.

Yes, I need to vanish existing data completely inside attributes object. After that, I need to insert into attributes with the values comes from SQL. Hope this can clear now @Badger . Could you please alter above logstash.conf for this case?

Still need a bit more detail. You can overwrite the document in elasticsearch by use document_id => "%{employee.id}". However, if you have additional fields and logstash is only processing [attributes] overwriting the document will lose all the other fields. That's why I asked: Do you need to overwrite the entire document, or just replace the [attributes] field?

HI @Badger , Yes I need to overwrite the document using ID. But those fields are overwrite properly using above code. If new attributes are available, I need to replace it completely using ID. I am worry about attributes field alone not overwriting with new data.

Use output { stdout } and show us the event then show us the same document in elasticsearch.

Hi @Badger , This is the format of the event in stdout.

{
    "custom_index_name" => "my_index",
             "@version" => "1",
                   "id" => 375283,
                 "name" => "Test Data",
           "@timestamp" => 2023-04-24T09:56:47.000Z,
           "attributes" => {
                "Name" => "Jack",
                 "Age" => "27",
        "City" => "Chennai"
    },
                 "tags" => [
        [0] "_aggregatefinalflush"
    ]
}

In Elastic search the document is like below:

{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my_index",
        "_id" : "375283",
        "_score" : 1.0,
        "_source" : {
          "tags" : [
            "_aggregatefinalflush"
          ],
          "@timestamp" : "2023-04-24T09:56:47.000Z",
          "attributes" : {
            "Name" => "Jack",
                 "Age" => "27",
        "City" => "Chennai"
          },
          "@version" : "1",
          "name" : "Test Data",
          "id" : 375283,
          "custom_index_name" : "my_index"
        }
      }
    ]
  }
}

OK, so the final flush event has three attributes, and elasticsearch has same three attributes. I do not see an issue.

Yes @Badger now it looks same count. But if I re run and SQL returns more than 3 items which are new ones, the existing 3 items are not getting deleted. But I need those 3 attributes needs to be delete. That's the issue I'm facing.

What does the elasticsearch output look like in your logstash configuration?

This is my output filter part.

output {
     elasticsearch {
      hosts => ["http://localhost:9200"]
      index => "my_index"
      document_id => "%{id}"
      document_type => "%{type}"
      action => "update"
      doc_as_upsert => true
    }
}

Here I am trying to update Index, the other fields mentioned above are update/override properly. But attributes field alone is not update as per result in SQL.

I think you have misunderstood what that does. See this thread.

I suggest you ask a new question, saying: I am repeatedly fetching rows from a database. I insert them into elasticsearch using the unique key as the document_id. For any fields not on the current document I want to add any missing columns to the exiting documents. For fields that do exist on the current document (including hashes) I want to overwrite them. How can I do this?

I've created the new topic here with same details as you've suggested. The link you've shared above is also exact similar case I am trying to achieve it. Kindly help me on this @Badger .
Thanks

I cannot help at this point (I do not even run elasticsearch), but you have now posted a much clearer question that I suspect one of the other regulars can respond to. Good luck!