How can I combine some mysql query events with logstash aggregate filter?

Hello,
I implemented pipeline with JDBC input plugin, my table in mysql follow the EAV (Entity-Attribute-Value) structure, therefore I can't get all attributes that related to one user (Entity) in one row with mysql query. I need to get the attributes in separated rows and combine them in Application layer (in my case logstash filter stage).

My question is How can I combine the sepated rows (events) in one row (event) in logstash filter stage?

For example I getting this events as a query result:

[user_id]   [last_sync_time]	[name]		[level]	[property_name]			[property_value]	[hero_name]		[hero_level]	[hero_skill]
1423352		1631916107			❤RAGA3❤	247		user_yellow_artifact	65					hero_angel		78				15			
1423352		1631916107			❤RAGA3❤	247		user_purple_artifact	15					hero_angel		78				15			
1423352		1631916107			❤RAGA3❤	247		user_food				10638173			hero_victory	96				15			
1423352		1631916107			❤RAGA3❤	247		user_energy				574822				hero_victory	96				15	

I want to combine these events with this rules:

  1. All events with same user_id must be combine as one event
  2. Aggregating repeatitive fields such as last_sync_time, name, level as one field
  3. Adding each unique property_value such as user_yellow_artifact, user_purple_artifact and ... to final combined event

The final result after combining should be like this:

[user_id]	[last_sync_time]	[name]		[level] 	[user_yellow_artifact] 	[user_purple_artifact] 	[user_food] [user_energy]  [hero_angel_level] 	[hero_angel_skill]	[hero_victory_level] 	[hero_victory_skill]
1423352		1631916107			❤RAGA3❤	247			65						15						10638173	574822			78					15					96						15

I think this can performing with logstash aggregate plugin, but this plugin designed for log combining, how can I use it in my case?

Thanks for any help.

1 Like

Correct. You could try something like this. You may not need the mutate/csv depending on the input you are using

    mutate { gsub => [ "message", "[\[\]]", "", "message", "\s+", " " ] }
    csv { separator => " "  autodetect_column_names => true }
    aggregate {
        task_id => "%{user_id}"
        code => '
            map["last_sync_time"] ||= event.get("last_sync_time")
            map["name"] ||= event.get("name")
            map["level"] ||= event.get("level")

            pName = event.get("property_name")
            pValue = event.get("property_value")
            map[pName] = pValue

            map["heroes"] ||= {}
            # Just keep overwriting these
            hName = event.get("hero_name")
            hLevel = event.get("hero_level")
            hSkill = event.get("hero_skill")
            map["heroes"][hName] = { "level" => hLevel, "skill" => hSkill }

            event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "user_id"
        timeout => 6
        timeout_tags => []
    }
    ruby {
        code => '
            heroes = event.get("heroes")
            if heroes
                heroes.each { |k, v|
                    event.set("#{k}_skill", v["skill"])
                    event.set("#{k}_level", v["level"])
                }
                event.remove("heroes")
            end
        '
    }

which produces

    "hero_angel_level" => "78",
      "last_sync_time" => "1631916107",
            "@version" => "1",
               "level" => "247",
"user_purple_artifact" => "15",
             "user_id" => "1423352",
         "user_energy" => "574822",
                "name" => "❤RAGA3❤",
  "hero_victory_skill" => "15",
"user_yellow_artifact" => "65",
    "hero_angel_skill" => "15",
          "@timestamp" => 2021-09-18T17:03:09.155Z,
  "hero_victory_level" => "96",
           "user_food" => "10638173"
2 Likes

Special thanks @Badger to your fully detailed solution :heart:.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.