Custom Log integration multiple pipelines

I have a custom integration setup with Elastic agent. At the moment I have a directory with several log files containing quite different data. I can't use the same pipeline so I would like to reference another pipeline. I am happy for all of these to share the same data stream, even if some fields will be different.

At the moment in the custom log integration I simply chose
pipeline: logs-ams-worker

This processes the worker logs and all is well. I also need to process a different log file with a different pipeline e.g logs-ams-node.

Is it possible in the custom configuration where I have my pipeline declared to be able to have a condition with a distributor or an if statement or some other method.

if doc['source'].value == 'logs-ams-worker.log' 
else if doc['source'].value == 'logs-ams-node.log'
else if  doc['source'].value == 'logs-ams-other.log'

Hi @searchtastic

Typically you would create a top-level pipeline that then calls the sub-pipelines... See here

This is a powerful way to control pipeline flow and build component pipelines that can be reused.

@stephenb Thanks for the help.

So I guess I could use something like the below code, would that work on file /opt/ams/logs/worker_2.01.log?

if":"ctx.log.file.path.contains('worker')",
            "name":"logs-ams-worker"
if":"ctx.log.file.path.contains('node')",
            "name":"logs-ams-node"

I then need to figure out how to make just two of the four pipelines multiline, ideas welcome.

Thanks again.

You know since the sorting is path based you could just create 2 or N custom Logs integrations with the paths / patterns and then just call the exact pipeline...

For Multiline that happens before the pipeline you need to put it in the integration using the multiline syntax (actually the legacy syntax) see here...
Make sure to use the Log input syntax

Using log input:

multiline.type: pattern
multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:'
multiline.negate: false
multiline.match: after

Sample

ProTip ... Multiline can be a be challenging, I just install a quick filebeat with log input to test...but that is just me.

Here is a really nice article on all this ... we need to make our docs this good :slight_smile:

1 Like

I really liked your idea of using a single pipeline in the integration that called a sub-pipeline. Although it doesn't seem to be working for me.

Do you know if it's possible to post the log.file.path within the POST command to test the pipeline.

POST _ingest/pipeline/worker/_simulate
{
  "docs": [
  {
    "_source": {
      "ctx.log.file.path": "/opt/logs/worker.debug.log",
      "message": "2022-01-10 15:47:54,757 INFO supervisord worker process"
    }
  }
]   
}

so that I have a way of testing if the pipeline is called.

        "script_stack" : [
          "ctx.log.file.path.contains('worker')",
          "       ^---- HERE"
        ],
        "script" : "ctx.log.file.path.contains('worker')",
        "lang" : "painless",
        "position" : {
          "offset" : 7,
          "start" : 0,
          "end" : 45
        },
        "caused_by" : {
          "type" : "null_pointer_exception",
          "reason" : "cannot access method/field [file] from a null def reference"

Thanks again.

Helps if you actually show the pipeline not just the error but I suspect you need null safety checks...

Right above the doc I linked

Incoming documents often contain object fields. If a processor script attempts to access a field whose parent object does not exist, Elasticsearch returns a NullPointerException. To avoid these exceptions, use null safe operators, such as ?., and write your scripts to be null safe.

For example, ctx.network?.name.equalsIgnoreCase('Guest') is not null safe. ctx.network?.name can return null. Rewrite the script as 'Guest'.equalsIgnoreCase(ctx.network?.name), which is null safe because Guest is always non-null.

If you can’t rewrite a script to be null safe, include an explicit null check.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that contain 'network.name' of 'Guest'",
        "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
      }
    }
  ]
}

Ohh and this is not correct ctx means context of the document in the painless script in the processor.... you pre-pended it to the field in the conditional statements...

don't prepend it in the simulate etc.

if condition scripts run in Painless’s ingest processor context. In if conditions, ctx values are read-only.

 "_source": {
      "ctx.log.file.path": "/opt/logs/worker.debug.log", <!---- NOT correct
      "message": "2022-01-10 15:47:54,757 INFO supervisord worker process"
    }
  }

 "_source": {
      "log.file.path": "/opt/logs/worker.debug.log", <!---- Correct
      "message": "2022-01-10 15:47:54,757 INFO supervisord worker process"
    }
  }
1 Like

Thank you Stephen for the help.

So the pipeline works well with the

 {
         "pipeline":{
            "if":"ctx.log.file.path.contains('worker)",
            "name":"logs-ams-worker"
         }
      },
      {
         "pipeline":{
            "if":"ctx.log.file.path.contains('node')",
            "name":"logs-ams-node"
         }
      }

The multiline is being problematic, so I think I will go back to the multiple integrations. It does in my case mean I will be running six of these custom integrations for one server type, which seems excessive. But life is too short.

Thanks again.

Glad you got it working!

Perhaps provide more details... multiline is always a bit more challenging... but to be clear Multi-line happens on the Collection Side / In the Agent not on in the ingest pipeline...