Logstash CSV - How to concatenate 1 column's all values into a new field?

Hi team,

I have below records in CSV.

timestamp,module,page,userId,actionType,httpsessionId
2019-02-13 02:56:05.356,succession,talentsearch,cgrant1,scm.ts.list_saved_search,sid1
2019-02-13 02:56:05.358,succession,talentsearch,lokamoto1,scm.ts.list_saved_search,sid2
2019-02-13 02:56:05.358,succession,talentsearch,cgrant1,scm.ts.start_over,sid1
2019-02-13 02:56:05.360,succession,talentsearch,cgrant1,scm.ts.delete_saved_search,sid1
2019-02-13 02:56:05.361,succession,talentsearch,lokamoto1,scm.ts.search,sid2
2019-02-13 02:56:05.365,succession,talentsearch,lokamoto1,scm.ts.nominate,sid2

with above data, I can come up with 2 user scenarios from actionType field. (doesn't need prefix scm.ts. and remove it)

userScenario
list_saved_search->start_over->delete_saved_search (cgrant1's user scenario in sid1)
list_saved_search->search->nominate (lokamoto1's user scenario in sid2)

How to use logstash to put the ationTypes into the a new field(userScenario) based on module, page and userId? And the actionTypes in userScenario field need displayed by time ascendly.

Thanks,
Cherie

You could use an aggregate filter to do this.

    csv {
        autodetect_column_names => true
    }
    aggregate {
        task_id => "%{httpsessionId}"
        code => '
            map["userScenario"] ||= ""
            map["userScenario"] += event.get("actionType") + "->"
            map["userId"] = event.get("userId")
            event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "httpsessionId"
        timeout => 6
        timeout_code => '
            event.set("userScenario", event.get("userScenario").chomp("->"))
        '
    }

Note that this requires "--pipeline.workers 1" so it does not scale with your hardware. In the above code I only copy the userId field to the final event. If you want additional columns then add them to the map in the code option.

1 Like

Thanks Badger. I attended Machine Learning+ Canvas session last Friday and consult this topic with the presenter as well. He gave me the same proposal like you suggest. Logstash is very rich of kinds of filters with different functions.

Two more queries:

  1. when aggregate the the actionType values as a string in same httpsessionId, it is already aggregated by time ascendly, right?

  2. For the timeout field

  • if it is passed the timeout value but the httpsessionId still alive and logstash doesn't finish the aggregation of actionType in this session, what happens?
  • if the session is over and agg filter finish aggregration within the timeout value, the timeout_code won't be triggered, right? does it have any side effect for next sessionId's aggregration?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.