Beats and Elastic Agent data streams

Hello folks,

I am researching a little bit about how to send Beats data to Elasticsearch using a Logstash and data streams instead of indexes.

Beats -> Logstash (data stream config) -> Elasticsearch

I know that some Beats have it's own ingest pipelines so I need to be careful with it but, having it in mind, is there any issue to use the same data stream that is automatically created by an Elastic Agent and it's integrations?

For exemple, let's say that I want to send Metricbeat data using the following data stream:

type: metrics
stream: {metricbeat event.dataset}
namespace: default

I should have something like this:

metrics-system.process-default

It is the same structure for the Elastic Agent System integration.

What could be the issues by doing this?

Thanks,
Matheus!

The fields exported by Metricbeat and Elastic Agent are slightly different, so I would expect for some things not working right if you use the same data stream for boths, like dashboards breaking or even mapping conflict issues.

For example, when people migrate from beats to elastic agent, one of the steps is to modify or recreate any custom dashboards.

Well If there is a module that is the same as an integration you can actually do this...

BTW ... Installing Integration assets is a great "hack" errrr I mean workaround for lots of things... you can basically use filebeat and trick it into acting like a full Agent.... so you can use the pipelines, dashboards etc..etc.. send the data to the data stream... and WALLUH (YMMV)

Example hack with filebeat to use and integration

filebeat.inputs:
- type: filestream
  id: my-filestream-id
  enabled: true
  paths:
    - /var/log/*.log
  fields_under_root: true
  fields:
    data_stream.type: logs
    data_stream.dataset: system.syslog
    data_stream.namespace: default

setup.ilm.enabled: false
setup.template.enabled: false
# setup.template.settings:
#   index.number_of_shards: 1

setup.kibana:

output.elasticsearch:
  hosts: ["http://localhost:9200"]
  index: "%{[data_stream.type]}-%{[data_stream.dataset]}-%{[data_stream.namespace]}" 

this works for lots of integrations, nginx fw logs etc..etc..

This works for logs etc ...

But will probably not work for metrics and that is because metricbeat does not support Time series data stream TSDS Like the elastic agent integration.

You would have to test but the filebeat use case does work (I tested It for several integrations ) but metricbeat.. I'm not sure and I have not tested.

I'm doing a few tests and so far I did:

Created a component template with the metricbeat default mappings and settings (those ones from metricbeat setup command - version 8.13.4).

Adjusted the metricbeat component template changing the 'agent.hostname' from 'alias' to 'keyword'.

Created an index template with the 'metrics@settings', 'metrics-metricbeat@package' and 'ecs@mappings' included and the index pattern defined to 'metric-metricbeat.*-*'.

Created the Logstash pipeline.

Pipeline:

input {
  beats {
    port => 5045
    ssl_enabled => "false"
  }
}

filter {
  // beat 5.0.0 compatibility
  if [beat] {
    mutate {
      rename => { "beat" => "agent"}
      rename => { "source" => "log.file.path"}
      rename => { "[metricset][module]" => "[event][module]"}
    }
  }
  if ![host][name] {
    mutate {
      add_field => { "[host][name]" => "%{[agent][name]}"}
    }
  }
  if ![@metadata][pipeline] {
    mutate {
      add_field => { "[@metadata][pipeline]" => "%{[@metadata][beat]}-pipeline"}
    }
  }
}

output {
  #stdout { codec => json }

  if "metricbeat" in [@metadata][beat] {
    elasticsearch {
      hosts => ["https://localhost:9200"]
      data_stream => "true"
      data_stream_type => "metrics"
      data_stream_dataset => "metricbeat.generic"
      data_stream_namespace => "default"
      pipeline => "%{[@metadata][pipeline]}"
      user => ""
      password => ""
    }
  }

}

Created the Elasticsearch Ingest Pipeline (metricbeat-pipeline)

PUT _ingest/pipeline/metricbeat-pipeline
{
  "processors": [
    {
      "script": {
        "source": "\r\nctx['event']['dataset'] = \"metricbeat.\"+ctx?.event?.dataset;\r\n \r\n",
        "if": "ctx?.event?.dataset != null"
      }
    },
    {
      "script": {
        "source": "\r\nctx['event']['dataset'] = \"metricbeat.\"+ctx?.event?.module+\".\"+ctx?.metricset?.name;",
        "if": "ctx?.event?.dataset == null"
      }
    },
    {
      "reroute": {
        "dataset": [
          "{{event.dataset}}"
        ]
      }
    }
  ]
}

Tested with metricbeat 8.13.4, 7.17.21, 6.8.23, 5.6.16 and 5.0.0 versions.

I had to add some compatibility with the 5.0.0 on Logstash and Elasticsearch Ingest pipelines because it not uses the ECS schema (of course!).

I see a few differences when some versions does not have all the system metrics needed for some standard dashboards (like Infrastructure).

Versions 7.x.x and 8.x.x seems to be working fine.

I will do another test setting up an elastic agent with the System integration to see if some field conflict will appear and possibly change/fix it on the metricbeat side (logstash or elasticsearch pipeline).

Thanks,
Matheus