Logstash multiple pipeline Openshift/kubernetes no traffic

EDIT

I finally get traffic into the cluster, but I only receive logs that have the string "FMC_AUDIT_LOG", nothing else get past. If I receive a syslog message with the string "test", it gets dropped. I want the message to get forwarded to the "hpeilo" pipeline. Is this possible?

Hello team,

Plattform: Openshift 4.12.4

I'm in the process of collecting logstash udp syslog from different part of our corporation. I have been able to create pipelines and ingest data when I use single pipelines on logstash pods. Now I'm in the process of implementing multiple pipeline configuration for my logstash pods.

I have created all the configuration files (deployment, configmap, service, logstash pipeline) but I'm not able to receive any logs when I use multiple pipelines function in logstash on Openshift.

Below is my configuration files. I have been on this task for approximately 3 days without success so this is my last chance to get some help I guess.

All the files are correctly mounted in the pod and I have verified that the pod is up and running. The logs tell me that logstash is ready to receive traffic on the specified ip/ports.

Deployment

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ .Values.logstash_fmc.name }}
  namespace: {{ .Values.namespace }}
  labels:
    app.kubernetes.io/name: eck-logstash
    app.kubernetes.io/component: logstash
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/name: eck-logstash
      app.kubernetes.io/component: logstash
  template:
    metadata:
      labels:
        app.kubernetes.io/name: eck-logstash
        app.kubernetes.io/component: logstash
    spec:
      containers:
        - name: logstash
          image: docker.elastic.co/logstash/logstash:{{ .Values.version }}
          ports:
            - name: "udp"
              containerPort: 1514
              protocol: UDP
            - name: "ilo"
              containerPort: 1515
              protocol: UDP
          env:
            - name: ES_HOSTS
              value: "https://{{ .Values.name }}-es-ingest.{{ .Values.namespace }}.svc:9200"
            - name: ES_USER
              value: "elastic"
            - name: ES_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: {{ .Values.name }}-es-elastic-user
                  key: elastic
          volumeMounts:
            - name: base-config
              mountPath: /usr/share/logstash/config/logstash.yml
              subPath: logstash.yml
            - name: pipeline-config
              mountPath: /usr/share/logstash/config/pipelines.yml
              subPath: "pipelines.yml"
            - name: ciscofmc-config
              mountPath: /usr/share/logstash/pipeline/ciscofmc.yml
              subPath: "ciscofmc.yml"
            - name: hpeilo-config
              mountPath: /usr/share/logstash/pipeline/hpeilo.yml
              subPath: "hpeilo.yml"
            - name: ca-certs
              mountPath: /etc/logstash/certificates
              readOnly: true
            - name: tls-key
              mountPath: /etc/logstash/certificates/key
              readOnly: true
      volumes:
        - name: base-config
          configMap:
            name: logstash-base-config
        - name: pipeline-config
          configMap:
            name: logstash-pipeline-config
        - name: ciscofmc-config
          configMap:
            name: logstash-ciscofmc-config
        - name: hpeilo-config
          configMap:
            name: logstash-hpeilo-config
        - name: ca-certs
          secret:
            secretName: {{ .Values.name }}-es-http-certs-public
        - name: tls-key
          secret:
            secretName: {{ .Values.name }}-es-http-private-key

configmaps

logstash.yml

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-base-config
  namespace: {{ .Values.namespace }}
  labels:
    app.kubernetes.io/name: eck-logstash
    app.kubernetes.io/component: logstash
data:
  logstash.yml: |
    http.host: "0.0.0.0"
    path.config: /usr/share/logstash/pipeline

pipeline number 1

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-ciscofmc-config
  namespace: {{ .Values.namespace }}
  labels:
    app.kubernetes.io/name: eck-logstash
    app.kubernetes.io/component: logstash
data:
  ciscofmc.yml: |
        input {
          udp {
            port => 1514
            type => "syslog"
          }
        }

        filter {
          if "FMC_AUDIT_LOG" not in [message] {
            drop {}
          
          } else if "abc.com" not in [message] {
            grok {
              match => { "message" => '<%{INT:logsyslogfacilitycode}>%{SYSLOGTIMESTAMP:timestamp}\s%{IPORHOST:sysloghost}\s%{DATA:eventcode}:\s\[%{USER:eventprovider}\]\s%{HOSTNAME:sourcehostname}:\s%{USERNAME:ciscofmcsource_username}@(?<sourceip>(?<=@)[^,]+)%{GREEDYDATA:data}'
              }
            add_tag => ["local_user"]
            } 
          } else if "abc.com" in [message] {
            grok {
              match => {
                "message" => '<%{INT:logsyslogfacilitycode}>%{SYSLOGTIMESTAMP:timestamp}\s%{IPORHOST:sysloghost}\s%{DATA:eventcode}:\s\[%{USER:eventprovider}\]\s%{HOSTNAME:sourcehostname}:\s%{EMAILADDRESS:ciscofmcsource_username}@(?<sourceip>(?<=@)[^,]+)%{GREEDYDATA:data}'
              }
              add_tag => ["AD_user"]
            }
          } else {
              mutate {
              add_tag => ["grok_failed"]
             }
          }

          date {
            match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
          }

          mutate {
            split => { "data" => ", "}

            add_field => { "[event][type]" => "%{[data][1]}" }
            add_field => { "[event][dataset]" => "%{[data][2]}" }
          }

          mutate {
            rename => {
              "logsyslogfacilitycode" => "[log][syslog][facility][code]"
              "sysloghost" => "[cisco][fmc][mapped_source_host]"
              "eventcode" => "[event][code]"
              "eventprovider" => "[event][provider]"
              "ciscofmcsource_username" => "[cisco][fmc][source_username]"
              "sourcehostname" => "[host][name]"
              "sourceip" => "[source][ip]"
            }
            remove_field => [ "data", "timestamp", "message", "app", "[host][ip]", "type"]
          }
        }
        output {
          elasticsearch {
            hosts => [ "${ES_HOSTS}" ]
            user => "${ES_USER}"
            password => "${ES_PASSWORD}"
            cacert => '/etc/logstash/certificates/ca.crt'
            index => "logstash-beta-%{+YYYY.MM.dd}"
            }
          } 

Pipeline number 2

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-hpeilo-config
  namespace: {{ .Values.namespace }}
  labels:
    app.kubernetes.io/name: eck-logstash
    app.kubernetes.io/component: logstash
data:
  hpeilo.yml: |
      input { 
        udp {
          port => 1515
          type => "syslog"
        }
      }

      filter {
        if [type] == "syslog" {
          # This message doesn't match the condition in pipeline 2, so it will be processed by pipeline 1.
          grok {
            match => { "message" => "%{GREEDYDATA:message}" }
          }
          date {
            match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
          }
        } else {
          # This message matches the condition in pipeline 2, so it will be dropped by pipeline 1.
          drop {}
        }
      }

      output {
        elasticsearch {
          hosts => [ "${ES_HOSTS}" ]
          user => "${ES_USER}"
          password => "${ES_PASSWORD}"
          cacert => '/etc/logstash/certificates/ca.crt'
          index => "logstash-ilo-%{+YYYY.MM.dd}"
        }
      }

pipeline configuration

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-pipeline-config
  namespace: {{ .Values.namespace }}
  labels:
    app.kubernetes.io/name: eck-logstash
    app.kubernetes.io/component: logstash
data:
  pipelines.yml: |
    - pipeline.id: hpeilo
      path.config: "/usr/share/logstash/pipeline/hpeilo.yml"
    - pipeline.id: ciscofmc
      path.config: "/usr/share/logstash/pipeline/ciscofmc.yml"

service to expose logstash

---
apiVersion: v1
kind: Service
metadata:
  name: {{ .Values.logstash_fmc.name }}
  namespace: {{ .Values.namespace }}
  labels:
    app.kubernetes.io/name: eck-logstash
    app.kubernetes.io/component: logstash
  annotations:
    metallb.universe.tf/address-pool: elastic-udp-pool
    metallb.universe.tf/allow-shared-ip: "elastic-agent-svc"
spec:
  ports:
  - name: udp
    port: 514
    protocol: UDP
    targetPort: 1514
  - name: ilo
    port: 1515
    protocol: UDP
    targetPort: 1515
  selector:
    app.kubernetes.io/name: eck-logstash
    app.kubernetes.io/component: logstash
  type: LoadBalancer
  loadBalancerIP: {{ .Values.loadbalancer.ip }}

Message from logstash that verifies that it listens on the port/ips

[2023-03-15T16:49:52,721][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>1.44}
[2023-03-15T16:49:52,734][INFO ][logstash.inputs.beats    ][main] Starting input listener {:address=>"0.0.0.0:5044"}
[2023-03-15T16:49:52,738][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2023-03-15T16:49:52,750][INFO ][logstash.inputs.udp      ][main][ebc35bd884ceda15dd75e5a057642d0c2d7991e493476a2d2a9a058cfd2189e1] Starting UDP listener {:address=>"0.0.0.0:1515"}
[2023-03-15T16:49:52,750][INFO ][logstash.inputs.udp      ][main][117808a6c51bbca97c49ca64d82cdf05094ebcffd5cb3f779b388866a628d785] Starting UDP listener {:address=>"0.0.0.0:1514"}
[2023-03-15T16:49:52,752][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2023-03-15T16:49:52,756][INFO ][logstash.inputs.udp      ][main][ebc35bd884ceda15dd75e5a057642d0c2d7991e493476a2d2a9a058cfd2189e1] UDP listener started {:address=>"0.0.0.0:1515", :receive_buffer_bytes=>"106496", :queue_size=>"2000"}
[2023-03-15T16:49:52,756][INFO ][logstash.inputs.udp      ][main][117808a6c51bbca97c49ca64d82cdf05094ebcffd5cb3f779b388866a628d785] UDP listener started {:address=>"0.0.0.0:1514", :receive_buffer_bytes=>"106496", :queue_size=>"2000"}
[2023-03-15T16:49:52,807][INFO ][org.logstash.beats.Server][main][62a2a7efa34d2b9f8dd037df16ce70f8bcbb3600b2768a81bf12034960e5827e] Starting server on port: 5044

Your logstash is not running multiple pipelines, it is running just one pipeline, the main pipeline as you can see in these lines:

[2023-03-15T16:49:52,738][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}

And

[2023-03-15T16:49:52,752][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}

It merged all the files inside /usr/share/logstash/pipeline/ and started it as just one pipeline, you probably have other files in this path is you have a beats input that is not present in the files you shared for pipeline 1 and 2.

Try to edit your logstash.yml file and remove the path.config line since you want to use the pipelines.yml. If I'm not wrong, if you set path.config in logstash.yml it will take precedence and the pipelines.yml file will not be used.

Hi @leandrojmp
That was the fix. Thank you for your input.

Now I'm getting another issue with the pipeline.

I'm using this logstash pipeline-to-pipeline input.

        input {
          udp {
            port => 1514
          }
        }

        output {
          if [message] =~ /-ilo/ {
            pipeline {
              send_to => "hpeilo"
            }
          }
          else if [message] =~ /(abc\.com|abc\.net)/  {
            pipeline {
              send_to => "ciscofmc"
            }
          }
        }
  1. When I send a test syslog message, for example "This is a test", the first output will ignore the message but the second will ingest it.
  2. When I send a test syslog message, with the text "-ilo". The first output ingest the message but also the other output.

The second output will always ingest the message even if it contains another string than the specified value in the else if statement.

What am I missing here?

Thanks.

Hi again,

I updated the logstash configuration, it seems to work right now.

        input {
          udp {
            port => 1514
          }
        }

        output {
          if [message] =~ /-ilo/ {
            pipeline {
              send_to => "hpeilo"
            }
          }
          else if [message] =~ /FMC_AUDIT_LOG/ {
            pipeline {
              send_to => "ciscofmc"
            }
          }
        }

This is my pipelines.yml

    - pipeline.id: sysloginput
      path.config: "/usr/share/logstash/pipeline/sysloginput.yml"
    - pipeline.id: hpeilo
      path.config: "/usr/share/logstash/pipeline/hpeilo.yml"
    - pipeline.id: ciscofmc
      path.config: "/usr/share/logstash/pipeline/ciscofmc.yml"

Then you need to specify the input on the specific pipeline.

  input {
    pipeline {
      address => "hpeilo"
    }
  }

  filter {
      grok {
        match => { "message" => "%{GREEDYDATA:message}"
        }

Thank you and hopefully this will help anybody else that have issue with logstash pipeline-to-pipeline configuration.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.