Filebeat to Elastic-Agent Swap

We currently are utilizing Filebeat 8.13.x to ship sensor data for Zeek and Suricata via Kafka and utilizing Logstash to subscribe from Kafka and push to Elasticsearch.

We are in the mist of moving from filebeat to elastic_agent. We have installed the integrations for Suricata and Zeek. However, we are running into an issue

"error"=>{"type"=>"illegal_argument_exception", "reason"=>"pipeline with id \[%{\[@metadata\]\[pipeline\]}\] does not exist".

This is our current Logstash output


Output {
  elasticsearch {
    hosts => [ "nodes"]
    index => "logs-nsm-endpoint"
    action => "create"
    manage_template => "false"
    pipeline => "%{[@metadata][pipeline]}"
    user => "logstash"
    password => "supersecretpassword"
    ssl => true
    cacert => '/etc/logstash/ssl/elastic-certificate.crt'
    ssl_certificate_verification => true
  }
}

We have tried to to change the pipeline to the pipeline that got installed when adding the integration assets, but it just errored out on all the data.

Does anyone have a clear cut guide or any way forward or suggestions?

Hi Carl,

You’re hitting a common gap when introducing Logstash between Elastic Agent integrations and Elasticsearch.

The pipeline => "%{[@metadata][pipeline]}" only works when that metadata is set upstream (typically by Beats/Agent in a direct flow). In your Kafka → Logstash architecture, this metadata is not preserved, so Elasticsearch receives an invalid pipeline name.

More importantly, Elastic Agent integrations (Suricata/Zeek) are designed to work with data streams + managed ingest pipelines, not custom index + pipeline routing via Logstash.

Best path forward:
Move to data streams in Logstash output (data_stream => true)
Align with the integration naming convention (logs--)
Let Elasticsearch automatically resolve the correct ingest pipelines installed by the integration

If you keep the current model, you’ll need to manually map and maintain pipeline names, which defeats the purpose of using integrations.

Summary:
This is not just a missing pipeline it’s a mismatch between integration design (data streams + managed pipelines) and the current ingestion architecture.

There are a couple of issues here.

First, when using Elastic Agent integrations you do not have full control on the data stream name, the only thing you can change is the namespace, anything else will be depend on the integration.

So you will have something like this logs-integration.dataset-<namespace>, you can only change the namespace.

Second, when using Logstash with Elastic Agent integrations, the source data needs to have the data_stream fields that Logstash will use to know which pipeline to send, you do need [@metadata][pipeline].

Your output will be like the one in this documentation.

Something like this:

output {
  elasticsearch {
    hosts => [ "nodes"]
    user => "logstash"
    password => "supersecretpassword"
    data_stream => true    
    ssl => true
    cacert => '/etc/logstash/ssl/elastic-certificate.crt'
    ssl_certificate_verification => true
  }
}

Since you are sending your data to kafka using filebeat, your events will not have the required data_stream fields, until you change to Elastic Agent you will need to add this fields in your logstash pipeline.

For suricata this is simple as it has only one data stream, so you would need something like this in your logstash pipeline:

filter {
    mutate {
        add_field => {
            "[data_stream][type]" => "logs"
            "[data_stream][dataset]" => "suricata.eve"            
            "[data_stream][namespace]" => "default"
        }
    }
}

This would index the data on the logs-suricata.eve-default data stream, and use the correct ingest pipeline

For zeek this would be extremely complicated since the Elastic Agent integration has 43 different data stream, one for each dataset, so you would need to map each kind of log to its respective data stream to be able to use the correct ingest pipeline.

I would recommend that you replace Filebeat with Elastic Agent and start sending the data to Kafka using Elastic Agent, this way the events would have the correct data_stream fields that logstash requires.

Keep in mind that when using Elastic Agent it is expected to send the data directly to Elasticsearch, you can use Logstash, but it is considered something like an advanced case and the things that you can do will depend on the license you have for your cluster.

@Rafa_Silva @leandrojmp Changing to the data_stream => true and removed the pipeline . We also had to create the following filter-

filter {
  json {
    source => "message"
  }
}

This works well for Suricata. We have now pivoted to Zeek which uses the same Logstash configuration, but the data is not being ingested into Elasticsearch. We have tried with and without the filter. We also outputted the data to a file and it worked.

For Reference, this is our logstash.conf file.

input {
  kafka {
    codec => json
    bootstrap_servers => ' kafka-servers '
    client_id => "${HOSTNAME}"
    group_id => "logs-nsm"
    topics => ['nsm2']
    add_field => { "[meta][logstash_host]" => "${HOSTNAME}" }
    consumer_threads => 3
    decorate_events => false
    security_protocol => "SSL"
    ssl_truststore_location => "truststore.jks"
    ssl_truststore_password => "supersecret"
  }
}

filter {
  json {
    source => "message"
  }
}

output {

  elasticsearch {
    hosts => [ "nodes" ]
    data_stream => true
    user => "logstash"
    password => "supersecretpassword"
    ssl => true
    cacert => '/etc/logstash/ssl/elastic-certificate.crt'
    ssl_certificate_verification => false
  }

I'm not sure how this could have worked for Suricate, unless the source events have information about the data stream.

Can you share a sample message from Kafka for Suricata and another one for Zeek?

The sample data is required to understand what is happening before and after Logstash.

Also share a sample of the output you send to a file.

As mentioned before, suricata has a single ingest pipeline, Zeek has 43 different ingest pipelines, one for each data type.

Good progress moving to data_stream => true and removing the pipeline was the right step.

For Suricata working and Zeek not, this strongly indicates a data stream field mismatch, not a Logstash/Kafka issue.

With data_stream => true, Elasticsearch expects these fields to be correctly set in the event:

 data_stream.type
data_stream.dataset
data_stream.namespace

Suricata works because it has a single dataset, so even a simple/static mapping works.

For Zeek, it’s different each log type (conn, dns, http, ssl, etc.) maps to a different dataset, so if data_stream.dataset is missing or incorrect, Elasticsearch will reject or silently fail indexing.

Right now your pipeline only parses JSON:

json { source => "message" }

it does not enrich the event with the required data_stream.* fields for Zeek.

What to check next:

  1. Inspect a raw Kafka message for Zeek
    → confirm if fields like event.module, event.dataset, or any Zeek log type indicator exist
  2. Verify what your events look like after Logstash (stdout/file output)
    → confirm if data_stream.* fields are present
  3. If missing, you’ll need to map Zeek log types → datasets, for example:
  • conn → zeek.connection
  • dns → zeek.dns
  • http → zeek.http
    (this mapping must be explicit in Logstash)

Important point:

Zeek integration is not “plug and play” through Logstash unless you recreate this mapping logic. That’s why Elastic Agent → Elasticsearch direct is the recommended path.

Summary:

Suricata works because it’s a single dataset.

Zeek fails because dataset routing is missing or incorrect.

We were able to figure out with your help provided and this is what we were doing wrong and what we did to fix our issue.

The Logstash Kafka input had codec => json Which found a statement in the documentation

We removed the codec => json and left

filter {
  json {
    source => "message"
  }
}

Data is being populated as intended with Data Streams.

FYI, the data in Kafka does have the required data_stream information.

data_stream.type
data_stream.dataset
data_stream.namespace