Moving to integrations and ingest pipelines, need Logstash aggregate filter plugin replacement

Previously used Logstash to push data into Elastic. For some logs that had the most important data scattered around we used the Aggregate Filter Plugin to gather data across multiple lines and then push one single document to Elastic.

Pseudo Example

2026-05-26 12:00 (thread-01) [msgid:123]: Received file [hello.txt]
2026-05-26 12:00 (thread-01) [msgid:123]: File is from [some part]
2026-05-26 12:01 (thread-01) [msgid:123]: File moved to [some/dir] in 20ms

{
    msgid: 123
    file.name: hello.txt
    part: some part
    dir: some/dir
    duration: 20
}

After moving to Custom Filestream Integration with ingest pipelines I found no Aggregate Filter Plugin replacement.

I thought a workaround could be to update the documents by replacing the _id, but it seems data streams have op_type=create, which means I'm not allowed to update documents when using data streams (according to AI) ?

Any thought on how to handle this scenario when using integration with ingest pipelines?

Unfortunately If you need to use the aggregate filter, then you should stick with Logstash, there is no replacement, it is one of the filters on Logstash that have no equivalent on Elasticsearch.

There are other filters that have no equivalent also, like the split and translate fitler (the closest is the enrich, but it works different).

Thank you for your response.

Is it correct that data streams run in op_type=create mode which makes it impossible to update existing documents when using features such as Custom Filestream Log integration and SQL integration? If so, is there a way to bypass this restriction?

You can use create, as you mentioned, or index for overwrite. Here is from the documentation:

op_type string

Set to create to only index the document if it does not already exist (put if absent). If a document with the specified _id already exists, the indexing operation will fail. The behavior is the same as using the <index>/_create endpoint. If a document ID is specified, this paramater defaults to index. Otherwise, it defaults to create. If the request targets a data stream, an op_type of create is required.

Supported values include:

  • index: Overwrite any documents that already exist.
  • create: Only index documents that do not already exist.

Values are index or create.

Set to create to only index the document if it does not already exist (put if absent). If a document with the specified _id already exists, the indexing operation will fail. The behavior is the same as using the <index>/_create endpoint. If a document ID is specified, this paramater defaults to index. Otherwise, it defaults to create. If the request targets a data stream, an op_type of create is required.

Supported values include:

  • index: Overwrite any documents that already exist.
  • create: Only index documents that do not already exist.

Values are index or create.

The docs say we should consider data streams as append-only, with a small caveat under the "Append-only (mostly)" section:

Data streams are designed for use cases where existing data is rarely updated. You cannot send update or deletion requests for existing documents directly to a data stream. However, you can still update or delete documents in a data stream by submitting requests directly to the document’s backing index.

I'm in a pickle here, and I'm starting to consider that moving away from Logstash was a mistake. My understand is; Elasticsearch/Kibana and its features expect denormalized data, à la:

// this
{ msgid: 123, file.name: hello.txt, sender.name: somePart, dir: /some/path }

// instead of this
{ _id: 1, msgid: 123, file.name: hello.txt }
{ _id: 2, msgid: 123, sender.name: somePart }
{ _id: 3, msgid: 123, dir: /some/path }

But we have third-party software that spreads the important data across multiple log lines. I do not control the log format, so I cannot emit fully denormalized events at the source.

My goal is to end up with an index containing denormalized documents, so that I can efficiently use this for visualizations and statistics.

Since the Custom Filestream Log integration uses data streams, and data streams do not allow updates to documents, I cannot use the _id replacement/upsert workaround in an ingest pipeline. Also, ingest pipelines do not provide anything equivalent to Logstash's aggregate filter.

I tried creating a transform job (pivot), but denormalization does not seem like its intended purpose.

Considering how important denormalized documents are, I'm surprised I'm having such a hard time figuring out how to solve this problem, as I assume this is fairly common.

Logstash might be the actual solution, and that integrations (e.g., Custom Filestream Log) and ingest pipelines are not meant to replace Logstash for this type of use case.