Output condition filter on filebeat module

Hi, i have my kafka server with two filebeat module enabled: kafka and system, the filebeat config also include a "kafka" tags.

On my logstash server i use multi pipeline, what i'm trying to do is separate kafka logs from system logs in two different index.

Here is my logstash pipeline config:

 input { 
  beats { 
    port => 5044
  }
}
output {
  if [module] == "system" {
    pipeline { send_to => "logs-system-processing" }
  }
  if "kafka" in [tags] {
    pipeline { send_to => "logs-kafka-processing" }
  }
  stdout { codec => rubydebug { metadata => true } }
}

So my kafka index is correctly populated but there is nothing going in my system index.

What is the good syntax to on my system condition to make it work ?

I tried many different ways found on this forum but it didnt work.

If your events had a module field and it was equal to "system" then that would work. So the question is what does an event look like. Can you show the rubydebug output for an event sent by the system module?

Ok i did not configured the fields to add module atrribute and system value, and i dont want do it this way because the field will be also on kafka logs,

From what i saw, there is no way to add specific fields or tags directly in the filebeat module config,

Is there others inputs than using tags or fields to do conditionnals filter on logstash output ?

Here is an example rubydebug output i have:

{
    "agent" => {
        "hostname" => "PIXID-KAFKA3",
        "id" => "4f0c403b-c4af-41a3-8ba6-47d9a929066f",
        "ephemeral_id" => "107507ec-94fe-4f94-86e1-569843901b69",
        "type" => "filebeat",
        "version" => "7.7.1"
    },
    "log" => {
        "file" => {
            "path" => "/var/log/messages"
        },
        "offset" => 11986996
    },
    "@metadata" => {
        "version" => "7.7.1",
        "beat" => "filebeat",
        "ip_address" => "10.10.9.153",
        "type" => "_doc",
        "pipeline" => "filebeat-7.7.1-system-syslog-pipeline"
    },
    "fileset" => {
        "name" => "syslog"
    },
    "message" => "Jun 17 15:38:54 pixid-kafka3 filebeat: ],",
    "tags" => [
        [0]
        "kafka", [1]
        "beats_input_codec_plain_applied"
    ],
    "input" => {
        "type" => "log"
    },
    "@timestamp" => 2020 - 06 - 17 T15: 38: 55.469 Z,
        "ecs" => {
            "version" => "1.5.0"
        },
        "service" => {
            "type" => "system"
        },
        "host" => {
            "hostname" => "PIXID-KAFKA3",
            "os" => {
                "kernel" => "3.10.0-957.27.2.el7.x86_64",
                "codename" => "Core",
                "name" => "CentOS Linux",
                "family" => "redhat",
                "version" => "7 (Core)",
                "platform" => "centos"
            },
            "containerized" => false,
            "ip" => [
                [0]
                "10.10.9.153"
            ],
            "name" => "PIXID-KAFKA3",
            "id" => "709befdd51334c5bacdcf096a8afd4ee",
            "mac" => [
                [0]
                "00:50:56:b3:2f:cc"
            ],
            "architecture" => "x86_64"
        },
        "@version" => "1",
        "fields" => {
            "env" => "rct1"
        },
        "event" => {
            "timezone" => "+00:00",
            "module" => "system",
            "dataset" => "system.syslog"
        }
}

Ok, i reread the documentation and i understood better how to use fields in pipeline and what was the right syntax

https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html
https://www.elastic.co/guide/en/logstash/current/field-references-deepdive.html

Here is my new output

output {
  if [agent][type] == "filebeat" {
    if [event][module] == "system" {
      pipeline { send_to => "logs-system-processing" }
    }
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.