I am trying to deduplicate my events one the basis of timestamp and operation field. But it did not work?

My Log event:

{
          "priority" => 13,
              "host" => "172.31.63.35",
       "consistency" => "\"ONE\"",
            "source" => "\"127.0.0.1",
              "type" => "scylladb",
          "severity" => 5,
         "logsource" => "ip-172-31-63-35.ec2.internal",
       "source_port" => "0\"",
           "program" => "scylla-audit",
         "server_ip" => "\"172.31.63.35",
          "facility" => 1,
        "table_name" => "\"demo\"",
          "@version" => "1",
         "operation" => "\"create table demo (ID int Primary key);\"",
          "username" => "\"cassandra\"",
          "category" => "\"DDL\"",
           "message" => "\"172.31.63.35:0\", \"DDL\", \"ONE\", \"demo\", \"mykeyspace\", \"create table demo (ID int Primary key);\", \"127.0.0.1:0\", \"cassandra\", \"false\"",
         "timestamp" => "2024-01-23T06:12:14.000Z",
    "severity_label" => "Notice",
             "error" => "\"false\"",
    "facility_label" => "user-level",
     "keyspace_name" => "\"mykeyspace\"",
       "server_port" => "0\"",
        "@timestamp" => 2024-01-23T06:12:14.000Z
}


{
          "priority" => 13,
              "host" => "172.31.63.35",
       "consistency" => "\"ONE\"",
            "source" => "\"127.0.0.1",
              "type" => "scylladb",
          "severity" => 5,
         "logsource" => "ip-172-31-63-35.ec2.internal",
       "source_port" => "0\"",
           "program" => "scylla-audit",
         "server_ip" => "\"172.31.63.35",
          "facility" => 1,
        "table_name" => "\"demo\"",
          "@version" => "1",
         "operation" => "\"create table demo (ID int Primary key);\"",
          "username" => "\"cassandra\"",
          "category" => "\"DDL\"",
           "message" => "\"172.31.63.35:0\", \"DDL\", \"ONE\", \"demo\", \"mykeyspace\", \"create table demo (ID int Primary key);\", \"127.0.0.1:0\", \"cassandra\", \"false\"",
         "timestamp" => "2024-01-23T06:12:14.000Z",
    "severity_label" => "Notice",
             "error" => "\"false\"",
    "facility_label" => "user-level",
     "keyspace_name" => "\"mykeyspace\"",
       "server_port" => "0\"",
        "@timestamp" => 2024-01-23T06:12:14.000Z
}

My Used Method:

aggregate {
		task_id => "%{timestamp}%{operation}"
		code => "
		  map['timestamp'] ||= event.get('timestamp')
		  map['operation'] ||= event.get('operation')
		  
		  if map['timestamp'] < event.get('timestamp')
			event.set('query_result', event.get('result'))
			map['timestamp'] = event.get('timestamp')
		  else
			event.cancel()
		  end
		"
		push_map_as_event_on_timeout => true
		timeout_task_id_field => "timestamp"
		timeout => 5
	  }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.