Concerns with scaling Logstash when using the Aggregate filter

The aggregate filter is working great in my testing, but I'm concerned about scale because it is stateful and requires setting filter workers to 1 (-w 1 flag). We have about 150 servers, each of which will generate ~2GB of logs per day. The log structure is generally like this:

sameTaskId, start, username,size
sameTaskId,networkstats
anotherTaskId, start, username,size
sameTaskId, end, authAuditInfo
anotherTaskId,networkstats
anotherTaskId, end, authAuditInfo

We love the idea of aggregating in order to reduce storage costs and to avoid aggregating in every query, but are worried about 1 worker keeping up in real-time. How can we solve this? Based on my research, I have a few starter ideas, none of which sound ideal:

  1. Static load balancing rules - We could have each Logstash instance behind a load balancer configured to always route server traffic to the same instance. Cons: we would need to manage a large Logstash cluster and this kind of load balancing is not resilient to failures or needing to add/remove machines.
  2. Use pipeline-to-pipeline communication to fan out to multiple pipelines per machine as described here. Cons: could increase throughput of a machine, but would probably still need the static load balancing described above.
  3. Instead of the aggregate filter, use an entity-centric index as described here. I need to better understand this, but I'm concerned about the comment that the data would also be stored in the standard index. Also, I'm not sure where in the pipeline this aggregating would occur. During ingestion?
  4. Instead of the aggregate filter, use a batch process to periodically aggregate data after it is in Elasticsearch as described here. I'm open to this, but would need an example of how this could be done. Would this use the experimental "Rollup" feature or the "Transforms" feature? If not, would we have to use the API or a client SDK?

At the end of the day, I'd love to aggregate our logs, but want to be sure we can scale and observe system behavior in real-time. Thanks in advance for the input!

Transform is an implementation of the entity centric index idea, the discuss post you referenced is from before transform existed.

With transform you would write all your events into an index, better a data stream and you would use transform to create a session out of the events, basically combine x documents to write 1 document. From the example you gave I guess you want e.g. the duration (end - start), for this you need transform, rollup does not provide something like this. Both Rollup and Transform can run continuously and basically transform/rollup data as it is fed in. After the data has been processed by transform/rollup you are free to decide to delete the original data to reclaim storage space.

Can you provide an example of how the data should look like after it has been combined?

Nice! I was wondering if that was the case.

Happily! To flesh out my original example for a single task, the source log data looks like this:

taskId-1, start, timestamp, username, size
taskId-1, timestamp, networkstats
taskId-1, end, timestamp, authAuditInfo

We would like to aggregate this into a single record. For example,

taskId-1, startTimestamp, endTimestamp, duration, username, size, networkstats, authAuditInfo

As you can see, each log entry has different data fields and we would like a single record per taskId with all data fields, plus a new duration field where duration = endTimestamp - startTimestamp. Can this be done through the Transforms feature?

If so, this sounds great! My remaining concern would be about increased load on the Elasticsearch servers and how "real-time" this could be, given our data volume (150 servers generating ~2GB logs each per day).

We have some recommendations in our scaling docs. In your case I would especially look into point 4: "limit the scope of the source query". The reason: Only having a terms query would still query all data for every checkpoint. But if you know that 1 session can be worst case e.g. 30 minutes long, it helps a lot if you add a range query to source with now-30m, better now-30m/m (rounds it down to the minute).

I'll definitely keep that in mind. However, in my case a task can run for 1-2 days, so I can only limit so much.

Based on my extended example, are you able to comment on whether I can accomplish my needs with Transforms? It's not clear to me whether it can do the things I need: add fields for the same event as they trickle in and calculate a duration.

Well, than 2d + plus some worstcase based extra, makes still a difference.

Yes, calculating the duration is mentioned in the docs. With respect to adding fields: transform re-calculates the bucket, so if a event to an existing session comes in, that session gets recalculated. Docs again, it explains how this works and is useful to understand which heuristics it uses.

If there isn't an out of the box solution you always have 2 generic options: scripted_metric and ingest pipelines.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.