Sending data to new Data Stream with Elastic Agent

Hi, We are trying to use a Standalone Elastic Agent on Kubernetes. Right now it is creating indexes for us, and sending all container logs to it. The indexes end up looking like: logs-kubernetes.container_logs-cluster-01 where "cluster-01" is our namespace we defined in the elastic agent. How do we find specific data and send it to it's own index/namespace? We've tried to create a pipeline. Like:

  agent.yml: |-
    outputs:
      default:
        type: elasticsearch
        hosts:
          - >-
            ${ES_HOST}
        api_key: ${ES_API_KEY}
        pipelines:
          - pipeline: "test_reroute_to_dev"
            when.contains:
              kubernetes.pod.name: "my-app"

And then in Elastic/Kibana we create an Ingest Pipeline like this but nothing seems to happen:

PUT _ingest/pipeline/test_reroute_to_dev
{
  "version": 2,
  "description": "test a reroute",
  "processors": [
    {
      "reroute": {
        "tag": "sending to new index",
        "namespace": "my-new-app-namespace"
      }
    }
  ]
}

Hi @wrender1

The easier (and better practice) way to do that is just to use the existing @custom pipeline that is already configured for you to use... the perfect place to do the reroute; I just did this for another user.

Read this

Super important on the naming scheme you can not use -

<type>-<dataset>-<namespace>

so your new name is

logs-kubernetes.container_logs-cluster.01.

cluster.01 is your new names space... or cluster_01 just not with a -

You can do the reroute in the ready to be created and used..

"logs-kubernetes.container_logs@custom"

just create the pipeline and it will take affect immediately...

In the namespace field of the reroute processor does not allow complex assignment of value so you might need to create / construct a field with a set processorthen use that field in thereroute`.

Ok. Thanks for pointing out that we shouldn't have "-" in the namespace names.

I'm a little confused. If we have a bunch of container logs for various containers/pods all going to logs-kubernetes.container_logs-cluster.01 , then how do we extract a certain container/pod logs and split it into another Index. Are you able to provide more detail/examples on that? For example we want to send all logs from one specific pod to a certain index. Like kubernetes.pod.name: "my-app" all goes to logs-kubernetes.container_logs-my_app but all other containers log to logs-kubernetes.container_logs-cluster.01. Does that mean we would not define the custom pipeline in agent.yml?

Well, first, I would ask why you want to put containers in their own data streams... at a scale that will typically be not good.... 1000s of data streams when they can always simply be filtered on at any place in the UI / API / Alerts, etc... '

not to mention container names often have - in them

There is often the "sense" to route data to specific indices .. .i usually advise new users against that until they have used Elastic for a while... and then make these decisions...

What are you actually trying to accomplish...

Yes, you can do it through the UI... all it does is create the ...@custom ingest pipeline through the UI... which just ends up doing the same thing as through the REST API ... I use Kibana Dev Tools I find it faster

Here is a working reroute You can enter this in

Kibana -> Dev Tools (it will immediately take effect... you can delete it and it will stop.

So there you go .. but use with Caution

PUT _ingest/pipeline/logs-kubernetes.container_logs@custom
{
  "processors": [
    {
      "set": {
        "field": "new_namespace",
        "value": "{{kubernetes.container.name}}"
      },
      "gsub": {
        "field": "new_namespace",
        "pattern": "-",
        "replacement": "_"
      }
    },
    {
      "reroute": {
        "dataset": "{{data_stream.dataset}}",
        "namespace" : "{{new_namespace}}"
      }
    }
  ]
}

Same as

Now I have a bunch of data streams I don't want... or wait I can just take out the pipeline or take out the reroute and it will stop routing :slight_smile:

Thanks, but no that’s not what I’m trying to do. We have a requirement:

  • put all container logs from the kubernetes cluster into one index except…
  • put all the container logs from one particular pod that matches a certain message pattern into another index and apply an index policy to that to store for a longer period of time

Yup that makes sense... had another use that had some specific messages that need longer retention as well... so you are not alone

So, to set a different ILM policy can be a bit trickier... because you can not set an ILM policy in the ingest pipeline...

So you will need to clone a Template so clone the so here is the macro steps

Create the new ILM Policy you want for these special logs

Clone template 'logs-kubernetes.container_logs'

Name it your full new name like (and I would not use necessarily the container name but what makes it unique

'logs-kubernetes.container_logs-special_logs`

KEY set the index pattern which matches to your new specific data stream name , it needs to be more specific than the defaults

Then Set the priority to Higher... like 300

Then set the the new ILM policy

Here are some screen caps ... this was for -botique but you should get the picture...

Ok. Yes. I already have the ILM policy setup and working fine. Thanks though.

What I don’t know how to do and hoping someone can clarify is:

  • Send the logs from one pod/event matching a certain message pattern to a separate index
  • Send all of the other logs from all other pods to the the default one it creates with the elastic agent. (It’s already doing this by default)

I don’t see how your examples above show this at all. Are you able to clarify how you could match a pattern or pod name and send just that output alone to another index?

You need to learn about ingest pipelines and conditionals.

The reroute will be based on your conditional

Conditional will be based on the matching the value of a field with a regex in a certain field.

Pretty simple. Just get in there. And play with the set field and a conditional and a regex...

There is a regex example in there and make sure you read caution. Don't use expensive regexs.

It might be easier to parse that message and then check a field anyway so there's lots of ways to do.

You need to get in there. Roll up your sleeves and just create a if with a set instead of field and test until you get it the way you like it

Then apply the reroute.

Not to make it complicated but you could actually just have your custom pipeline have the if with the regex .. And then call another pipeline that has the routing in it...

In other words, think of ingest pipelines as composable code

Are you sure you can’t apply the pipeline and conditional on the elastic agent config side like I showed at the start of this discussion?

In the documentation it says: Alternatively, you can specify the pipeline policy setting in your elastic-agent.yml configuration. See Install standalone Elastic Agents.

Well test it out... Let us know...

Are you using standalone or fleet managed?

It's not really the approach I'm taking... What I gave you is a manageable scalable approach... Which can literally be set up in about 5 minutes and then start testing...

You can test the ingest pipeline using the _simulate API too...

I find debugging the agent.yml much harder than just the custom pipelines...

That said, there's always more than one way to do things..

I think it may be possible but your syntax is most likely not correct...

I don't know what the correct syntax would be off hand

I do think there's a conditional pipeline but it needs to go with an index or something to that effect. What affect that has on the Data Stream Name I am not sure...

It's using filebeat somewhat under the covers....

Maybe look at this.

Ohh putting the pipeline on the agent does not solve the template and ILM portion of the solution. You'll still need to do that

If I'm not wrong setting the pipeline in the yml file for the agent will make Elasticsearch use this pipeline to parse your data and not the default integration pipeline that already has the filters to parse the data.

This is not the same thing as adding a custom pipeline that will be the last one in the processing of your logs.

1 Like

Uhhh Yes!! Excellent Point... I was not even thinking about THAT doh!

The more I think about it, pipeline in the output section of agent.yml is most likely fraught with side effects... especially if you are using multiple integrations etc..etc..

Ok. Yes. We are using the standalone elastic agent deployed to Kubernetes as a daemonset. Is there a way to see the default integration pipelines? I would ideally like to set the pipeline and conditional on the agent if that is possible.

If going the @custom pipeline route. Does anyone have an example of adding a custom pipeline that does a pattern match in the message field, and then sends those events to a new index, and if it doesn’t match just send the event to the default indexes?

This can change for each dataset of each integration, for example the google workspace 14 datasets, you can see them in Kibana, going to Stack Managment > Ingest pipelines.

If I'm not wrong the ingest pipeline for the integrations have the following naming scheme:

<type>-<dataset->-<integration_version>

So for the example of the google workspace you will have 14 default ingest pipelines, like this one:

logs-google_workspace.admin-2.19.2 

So if you have different integrations on the Elastic Agent, you would need to specify a pipeline for each dataset of each integration in your yml, I'm not sure if it is possible to specify this dynamically, the Standalone agent is an advanced case, not many people use it and the documentation is not that deep, so you will need to test it yourself.

Stephen already shared an example, you basically just need a reroute processor with a conditional, there is an example of it in the documentation for the rerout processor.

We don't have fleet setup with this Elasticsearch cluster. As we wanted to go with the standalone elastic agent. When I go to Stack Management > Ingest pipelines I don't see anything related to my datasets. Should these ingest pipelines be created dynamically for us or do I have to create them manually? Is it maybe because we don't have fleet installed?

Did you followed this documentation?

The step 4 mentions that you need to make sure that the assets like dashboards and ingest pipelines are setup in Kibana and Elasticsearch.

You can install the assets needed going into Kibana > Integrations, select the integration you want to use and then you can install the assets, you do not need to have fleet to install the assets.

This is explained in this documentation.

1 Like

Thanks. It looks like we are missing those dashboards and ingest pipelines. When I go to Management in Kibana. There is no "Integrations" menu option.

Are these ingest pipelines a licensed feature? We are running unlicensed elastic cluster at the moment, so that could explain why none of this is working.

Integrations are not commercial feature in general.

So My Findings Integrations are available with

Basic License and Security Enabled and Kibana Encryption Keys (which we just guids in the kibana.yml Just run the command line tool. It prints out the settings)

You do not need fleet installed (I do not on my test)

Screen Shots


Ok. Interesting. I don't see "License management" nor "Integrations" in this cluster so I have a feeling security unfortunately was not considered when this cluster was setup.

On a side note. We were able to resolve this at the elastic agent standalone side of things. We ended up just needing to create an input filestream with a custom id, and namespace, and then add a conditional to look for our pod name like this (below). It then sends that data to a new index and we can filter with any processors we need.

I have a feeling that we will need to use pipelines in the future though, especially if that is the direction things are going, so thanks for the information on the licensing. We'll have to figure that out.

- id: kubernetes-my-custom-app-logs
  type: filestream
  use_output: default
  meta:
    package:
      name: kubernetes
      version: 1.52.0
  data_stream:
    namespace: my_custom_app_namespace_index
  streams:
    - data_stream:
        dataset: kubernetes.container_logs
        type: logs
      id: kubernetes-myapp-${kubernetes.pod.name}-${kubernetes.container.id}
      parsers:
        - container: null
      prospector.scanner.symlinks: true
      condition: stringContains(${kubernetes.pod.name}, "myapp") == true
      paths:
        - /var/log/containers/*${kubernetes.container.id}.log
1 Like