How to combine two records into one with logstash and call a filter script before save into Elasticsearch


I want to do some aggregation and transformation with logstash for input data stream as following steps:

  1. Combine two input metric events for a single transaction coming from transaction server and database into one record, the input metric event maybe 1 or 2 record(s) which means some transaction will not access a database, and the coming sequence of these 2 events is not ensured.

  2. Then, call a script/program to do some calculation with the transaction metric fields including query and comparing with data from another elasticsearch index, then store the result of calculation into the output elasticsearch index.

Any idea and help? thanks.

I guess this question is too broad. can you be more specific ?
These are more specs than technical questions per se.
what leads to you have to do it, what did you try so far, what are you thinking of...
There is more than one way to do this, so it depends on what you have in mind.

Mehdi, thanks for your reply.

Because I'm new to logstash plugins, currently I have no idea how to try for the solution.
Let me describe my question more specifically.

For example, I am using logstash http port 8080 to ingest metrics events from my transaction server and database server, the transaction server process the business application and also call database when needed, some transaction didn't call database. The metrics from transaction server(TS) and database server(DB) are coming independently, but for each transaction its metrics from TS and DB always have an unique identifier(uow) to correlate with each other, like this:
event 1: uow-1, trans-id, timestamp-TS, TS-server name, TS-metrics1(response time), TS-metrics2(TS-cpu-time),...
event 2: uow-1, trans-id, timestamp-DB, DB-server name, DB-metrics1(SQL-count), DB-metrics2(DB-locks count),...

PS1. for those transaction didn't access database, the event 2 is missing so only event 1 for that transaction is received.
PS2. event 2 may coming before event 1, because there are independently transfered.

Now, my requirements are:

  1. When logstash receives these two events, combine them into one record, like:
    event3: uow-1, trans-id, timestamp-TS, TS-server-name, TS-metrics1(response time), TS-metrics2(TS-cpu-time),..., timestapm-DB, DB-server name, DB-metrics1(SQL-count), DB-metrics2(DB-locks count),....
  2. Next, I want to use logstash ruby filter plugin to process this event3(if event2 is missing, then event 3 is equal to event1), I need to compare some metrics for this transaction event with its baseline(stored in another elasticsearch index which was pre-generated by some ML models befoer) to detect metrics's anomalies, and also calculate the deviation between the metric and its baseline and then store the calculated results into an elasticsearch index.

Hope it is more clear to be understand.

1.To my knowledge logstash is not stateful, meaning data flows and streams through it.
To that effect, data is processed "as it comes", there is not memory for it to process multiple events and merge them.
so basically when logstash receives an event from the input it flows down through the filters and gets spit out from the output and that's it. It moves then to the next event and so on.
You need some external merging mechanism (not sure ELK is strictly the right fit for that)

2.You could use a ruby filter to do all sorts of stuff, including what you describe. Keep in mind it is not very easy to maintain though.

Have you considered other tools and libraries (Spark ? which is more geared toward data processing ? Nifi Maybe ? ) I may be wrong but I am under the impression that what you are describing is more of a full fledged ETL use case...