Filebeat inputs configured to output to multiple logstash pipelines

We would like to be able to configure filebeat to route different log files to different logstash pipelines.

Is there a way to do this? As far as I can tell since we define the logstash port in the main filebeat configuration file all of the inputs for a single filebeat instance will share the same pipeline.

As some context / background, we have multiple service applications running on the same servers, managed by different teams but sharing the same elastic instance. Each of these applications have different log formats - text logging vs structured logging as well as different timestamp formats etc. Whilst we fully intend to normalize these formats and move to structured logging universally, in the short term we'd like to be able to set up individual pipelines for each team such that they can own any specific post-processing for their log files.

Hello, I believe that you can adapt the following logstash input/output settings for your environment (I added in the field 'app' via Filebeat before shipping to logstash:

Input filter:

filter {
  if [app] == "nginx-access" {
      grok {
       <operations>
    }
    else if [app] == "nginx-error" {
      grok { 
         <operations>
    }

Output filter:

output {
   if [fileset][module] == "nginx-access" {
    elasticsearch {
       hosts => ["localhost:9200"]
       sniffing => true
       manage_template => false
       document_id => "%{[@metadata][fingerprint]}"
       index => "nginx-access-%{+YYYY.MM.dd}"
     }
   }
   else if [app] == "nginx-error" {
    elasticsearch {
       hosts => ["localhost:9200"]
       sniffing => true
       manage_template => false
       document_id => "%{[@metadata][fingerprint]}"
       index => "nginx-error-%{+YYYY.MM.dd}"
     }
   }
   else if [app] == "postfix" {
    elasticsearch {
       hosts => ["localhost:9200"]
       sniffing => true
       manage_template => false
       document_id => "%{[@metadata][fingerprint]}"
       index => "maillog-%{+YYYY.MM.dd}"
     }
   }
    else {
    elasticsearch {
      hosts => ["localhost:9200"]
      sniffing => true
      manage_template => false
      index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
      }
    }
 }

Hopefully this helps or at least gives you some ideas.

Regards,

Thanks Corey - this is the approach we've now taken.

If there is any chance of adding a feature in future to allow target pipelines to be specified per input that is something we definitely could make use of.

Regards,
Stuart

Stuart,

No problem. The configs I pasted for you were actually my old setup/config :laughing:

I actually converted to Filebeat modules + Ingest pipelines to eliminate Logstash for the most part. You can basically do the same as above, it just requires some slight modifications for your purposes:

Filebeat output config:

 output.elasticsearch:
  hosts: ["<eshost>:9200", "<eshost>:9200", "<eshost>:9200", "<eshost>:9200"]
  indices:
    - index: "syslog-%{[beat.version]}-%{+YYYY.MM.dd}"
      pipeline: filebeat-6.6.0-system-syslog-pipeline
      when.contains:
        fileset.name: "syslog"
    - index: "syslog-auth-%{[beat.version]}-%{+YYYY.MM.dd}"
      pipeline: filebeat-6.6.0-system-auth-pipeline
      when.contains:       
        fileset.name: "auth"  
    - index: "nginx-%{[beat.version]}-%{+YYYY.MM.dd}"
      pipeline: filebeat-6.6.0-nginx-access-default
      when.contains:
        event.dataset: "nginx.access"
    - index: "nginx-error-%{[beat.version]}-%{+YYYY.MM.dd}"
      pipeline: logs_pipeline 
      when.contains:
        event.dataset: "nginx.error"

A trick from the Docs you can use is a pipeline -> pipeline workflow; look at the nginx error pipeline, it points to logs_pipeline, which I currently have configured as:

Log of logs pipeline:

{
  "logs_pipeline" : {
    "description" : "A pipeline of pipelines for log files",
    "version" : 1,
    "processors" : [
      {
        "dot_expander" : {
          "field" : "event.dataset"
        }
      },
      {
        "pipeline" : {
          "if" : "ctx.event?.dataset == 'system.auth'",
          "name" : "filebeat-6.6.0-system-auth-pipeline"
        }
      },
      {
        "pipeline" : {
          "if" : "ctx.event?.dataset == 'system.syslog'",
          "name" : "filebeat-6.6.0-system-syslog-pipeline"
        }
      },
      {
        "pipeline" : {
          "if" : "ctx.event?.dataset == 'nginx.access'",
          "name" : "filebeat-6.6.0-nginx-access-default"
        }
      },
      {
        "pipeline" : {
          "if" : "ctx.event?.dataset == 'nginx.error'",
          "name" : "filebeat-6.6.0-nginx-error-pipeline"
        }
      }
    ],
    "on_failure" : [
      {
        "set" : {
          "field" : "error.message",
          "value" : "{{ _ingest.on_failure_message }}"
        }
      }
    ]
  }
}

Nginx error pipeline:

{
"filebeat-6.6.0-nginx-error-pipeline" : {
    "description" : "Pipeline for parsing the Nginx error logs",
    "processors" : [
      {
        "grok" : {
          "ignore_missing" : true,
          "field" : "message",
          "patterns" : [
            """%{DATA:nginx.error.time} \[%{DATA:nginx.error.level}\] %{NUMBER:nginx.error.pid}#%{NUMBER:nginx.error.tid}: (\*%{NUMBER:nginx.error.connection_id} )?%{GREEDYDATA:nginx.error.message}"""
          ]
        }
      },
      {
        "remove" : {
          "field" : "message"
        }
      },
      {
        "rename" : {
          "field" : "@timestamp",
          "target_field" : "read_timestamp"
        }
      },
      {
        "date" : {
          "field" : "nginx.error.time",
          "target_field" : "@timestamp",
          "formats" : [
            "YYYY/MM/dd H:m:s"
          ]
        }
      },
      {
        "set" : {
          "field" : "pipeline_processor",
          "value" : "filebeat-6.6.0-nginx-error-pipeline"
        }
      },
      {
        "remove" : {
          "field" : "nginx.error.time"
        }
      }
    ],
    "on_failure" : [
      {
        "set" : {
          "field" : "error.message",
          "value" : "{{ _ingest.on_failure_message }}"
        }
      }
    ]
  }
}

I added this field to help with debugging/testing and so I knew if a pipeline was hit:

      {
        "set" : {
          "field" : "pipeline_processor",
          "value" : "filebeat-6.6.0-nginx-error-pipeline"
        }
      }

The key to getting these to work is ensuring that your index templates + field mappings are setup correctly. How to do that is outside the scope of this post though.

You can also do some basic field enrichment by using a processor plugin for Filebeat.

I hope this helps.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.