New pipelines are not loaded correctly

Hi,
In our ELK 7.6.2 setup, we configured 4 new pipelines, so having a total of 8 pipelines:

    elkserver01# ls -la
    total 76
    drwxrwxr-x 10 logstash logstash  4096 Apr 22 14:18 .
    drwxr-xr-x  4 logstash logstash  4096 Apr 22 15:20 ..
    -rw-r--r--  1 logstash logstash    60 Apr 21 10:55 01-input.conf
    -rw-r--r--  1 logstash logstash   250 Aug 26  2016 01-input.conf.old
    -rw-r--r--  1 logstash logstash 19464 Apr 21 12:11 02-filter.conf
    -rw-r--r--  1 logstash logstash  1239 Apr 21 10:55 02-filter.conf.old
    -rw-r--r--  1 logstash logstash  1682 Apr 22 14:18 03-output.conf
    drwxr-xr-x  2 logstash logstash  4096 Apr 22 14:11 app
    drwxr-xr-x  2 logstash logstash  4096 Apr 22 13:55 kubernetes-internal
    drwxr-xr-x  2 logstash logstash  4096 Apr 22 14:07 kubernetes-ourbusinessname2
    drwxr-xr-x  2 logstash logstash  4096 Apr 22 14:07 kubernetes-ourbusinessname
    drwxr-xr-x  2 logstash logstash  4096 Apr 22 14:07 kubernetes-ourbusinessnamebusiness
    drwxr-xr-x  2 logstash logstash  4096 Apr 22 14:18 security
    drwxr-xr-x  2 logstash logstash  4096 Apr 22 14:03 stores
    drwxr-xr-x  2 logstash logstash  4096 Apr 22 14:06 system

The pipeline configuration file includes all of them:

# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
- pipeline.id: stores
  path.config : "/etc/logstash/conf.d/stores/*.conf"
- pipeline.id: kubernetes
  path.config : "/etc/logstash/conf.d/kubernetes/*.conf"
- pipeline.id: kubernetes-internal
  path.config : "/etc/logstash/conf.d/kubernetes-internal/*.conf"
- pipeline.id: kubernetes-ourbusinessname2
  path.config : "/etc/logstash/conf.d/kubernetes-ourbusinessname2/*.conf"
- pipeline.id: kubernetes-ourbusinessname
  path.config : "/etc/logstash/conf.d/kubernetes-ourbusinessname/*.conf"
- pipeline.id: kubernetes-ourbusinessnamebusiness
  path.config : "/etc/logstash/conf.d/kubernetes-ourbusinessnamebusiness/*.conf"
- pipeline.id: app
  path.config : "/etc/logstash/conf.d/app/*.conf"
- pipeline.id: system
  path.config : "/etc/logstash/conf.d/system/*.conf"
- pipeline.id: security
  path.config : "/etc/logstash/conf.d/security/*.conf"

The main logstash settings file contains no special changes:

    # Settings file in YAML
    #
    # Settings can be specified either in hierarchical form, e.g.:
    #
    #   pipeline:
    #     batch:
    #       size: 125
    #       delay: 5
    #
    # Or as flat keys:
    #
    #   pipeline.batch.size: 125
    #   pipeline.batch.delay: 5
    #
    # ------------  Node identity ------------
    #
    # Use a descriptive name for the node:
    #
    # node.name: test
    #
    # If omitted the node name will default to the machine's host name
    #
    # ------------ Data path ------------------
    #
    # Which directory should be used by logstash and its plugins
    # for any persistent needs. Defaults to LOGSTASH_HOME/data
    #
    path.data: /var/lib/logstash
    #
    # ------------ Pipeline Settings --------------
    #
    # The ID of the pipeline.
    #
    # pipeline.id: main
    #
    # Set the number of workers that will, in parallel, execute the filters+outputs
    # stage of the pipeline.
    #
    # This defaults to the number of the host's CPU cores.
    #
    # pipeline.workers: 2
    #
    # How many events to retrieve from inputs before sending to filters+workers
    #
    # pipeline.batch.size: 125
    #
    # How long to wait in milliseconds while polling for the next event
    # before dispatching an undersized batch to filters+outputs
    #
    # pipeline.batch.delay: 50
    #
    # Force Logstash to exit during shutdown even if there are still inflight
    # events in memory. By default, logstash will refuse to quit until all
    # received events have been pushed to the outputs.
    #
    # WARNING: enabling this can lead to data loss during shutdown
    #
    # pipeline.unsafe_shutdown: false
    #
    # ------------ Pipeline Configuration Settings --------------
    #
    # Where to fetch the pipeline configuration for the main pipeline
    #
    # path.config:
    #
    # Pipeline configuration string for the main pipeline
    #
    # config.string:
    #
    # At startup, test if the configuration is valid and exit (dry run)
    #
    # config.test_and_exit: false
    #
    # Periodically check if the configuration has changed and reload the pipeline
    # This can also be triggered manually through the SIGHUP signal
    #
    # config.reload.automatic: false
    #
    # How often to check if the pipeline configuration has changed (in seconds)
    #
    # config.reload.interval: 3s
    #
    # Show fully compiled configuration as debug log message
    # NOTE: --log.level must be 'debug'
    #
    # config.debug: false
    #
    # When enabled, process escaped characters such as \n and \" in strings in the
    # pipeline configuration files.
    #
    # config.support_escapes: false
    #
    # ------------ Module Settings ---------------
    # Define modules here.  Modules definitions must be defined as an array.
    # The simple way to see this is to prepend each `name` with a `-`, and keep
    # all associated variables under the `name` they are associated with, and 
    # above the next, like this:
    #
    # modules:
    #   - name: MODULE_NAME
    #     var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
    #     var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
    #     var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
    #     var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
    #
    # Module variable names must be in the format of 
    #
    # var.PLUGIN_TYPE.PLUGIN_NAME.KEY
    #
    # modules:
    #
    # ------------ Cloud Settings ---------------
    # Define Elastic Cloud settings here.
    # Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
    # and it may have an label prefix e.g. staging:dXMtZ...
    # This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
    # cloud.id: <identifier>
    #
    # Format of cloud.auth is: <user>:<pass>
    # This is optional
    # If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
    # If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
    # cloud.auth: elastic:<password>
    #
    # ------------ Queuing Settings --------------
    #
    # Internal queuing model, "memory" for legacy in-memory based queuing and
    # "persisted" for disk-based acked queueing. Defaults is memory
    #
    # queue.type: memory
    #
    # If using queue.type: persisted, the directory path where the data files will be stored.
    # Default is path.data/queue
    #
    # path.queue:
    #
    # If using queue.type: persisted, the page data files size. The queue data consists of
    # append-only data files separated into pages. Default is 64mb
    #
    # queue.page_capacity: 64mb
    #
    # If using queue.type: persisted, the maximum number of unread events in the queue.
    # Default is 0 (unlimited)
    #
    # queue.max_events: 0
    #
    # If using queue.type: persisted, the total capacity of the queue in number of bytes.
    # If you would like more unacked events to be buffered in Logstash, you can increase the
    # capacity using this setting. Please make sure your disk drive has capacity greater than
    # the size specified here. If both max_bytes and max_events are specified, Logstash will pick
    # whichever criteria is reached first
    # Default is 1024mb or 1gb
    #
    # queue.max_bytes: 1024mb
    #
    # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
    # Default is 1024, 0 for unlimited
    #
    # queue.checkpoint.acks: 1024
    #
    # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
    # Default is 1024, 0 for unlimited
    #
    # queue.checkpoint.writes: 1024
    #
    # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
    # Default is 1000, 0 for no periodic checkpoint.
    #
    # queue.checkpoint.interval: 1000
    #
    # ------------ Dead-Letter Queue Settings --------------
    # Flag to turn on dead-letter queue.
    #
    # dead_letter_queue.enable: false

    # If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
    # will be dropped if they would increase the size of the dead letter queue beyond this setting.
    # Default is 1024mb
    # dead_letter_queue.max_bytes: 1024mb

    # If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
    # Default is path.data/dead_letter_queue
    #
    # path.dead_letter_queue:
    #
    # ------------ Metrics Settings --------------
    #
    # Bind address for the metrics REST endpoint
    #
    http.host: "ournodeipaddress"
    #
    # Bind port for the metrics REST endpoint, this option also accept a range
    # (9600-9700) and logstash will pick up the first available ports.
    #
    # http.port: 9600-9700
    #
    # ------------ Debugging Settings --------------
    #
    # Options for log.level:
    #   * fatal
    #   * error
    #   * warn
    #   * info (default)
    #   * debug
    #   * trace

    log.level: debug
    path.logs: /var/log/logstash
    #
    # ------------ Other Settings --------------
    #
    # Where to find custom plugins
    # path.plugins: []

From the logstash-plain log it seems only the default 4 pipelines are loaded:

    [2020-04-22T15:51:14,281][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline {:pipeline_id=>"security", :thread=>"#<Thread:0x67544b1e@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:246 sleep>"}
    [2020-04-22T15:51:14,497][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline {:pipeline_id=>"stores", :thread=>"#<Thread:0x5d8ea275@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:246 sleep>"}
    [2020-04-22T15:51:15,104][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline {:pipeline_id=>"system", :thread=>"#<Thread:0x4cf2b66c@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:246 sleep>"}
    [2020-04-22T15:51:15,458][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline {:pipeline_id=>"app", :thread=>"#<Thread:0x5c202a7b@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:246 sleep>"}
    [2020-04-22T15:51:16,430][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
    [2020-04-22T15:51:16,430][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}

We checked the syntax of the .conf file inside the first of the custom pipelines and the logstash conf utility replied "OK" (so the .conf files should have a correct syntax).

What can be checked to make sure pipelines are loaded correctly ?

After incresing logstash log level to TRACE it turns out the pipeline is meeting some failures at loading phase:

[2020-04-23T10:18:08,202][TRACE][logstash.agent           ] Converge results {:success=>false, :failed_actions=>["id: kubernetes-internal, action_type: LogStash::PipelineAction::Create, message: Could not execute action: LogStash::PipelineAction::Create/pipeline_id:kubernetes-internal, action_result: false"]

Looking further, it appears we are facing syntax errors in the 02-filter.conf configuration file, the logstash config checker complains about

[FATAL] 2020-04-24 07:54:15.537 [LogStash::Runner] runner - The given configuration is invalid. Reason: Expected one of #, => at line 149, column 10 (byte 4992) after

      else {
          if [kubernetes][labels][version] != "v2" {
            date {
              match => [ "syslog_timestamp", "ISO8601" ]
              remove_field => [ "syslog_timestamp" ]
            }
          }
          prune {
            whitelist_names => ["^message_csv$","^host$","^beat","^source$","^type$","^offset$","@timestamp","kubernetes"]
          }
          mutate {
            add_tag => [ "haproxy-logs" ]
            remove_tag => [ "_csvparsefailure" ]
          }
        }
        else {
          if [kubernetes][labels][service] in ['ambassador','ambassador-http'] { #<----- ERROR LINE HERE
            grok {
              match => { "message" => '^%{TIMESTAMP_ISO8601:time} %{IP:clientip}:%{NUMBER:port} %{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion} %{NUMBER:http_status} %{NOTSPACE:response_flag} %{NUMBER:bytes_received} %{NUMBER:bytes_sent} %{NUMBER:request_time} \'%{NOTSPACE:X-Forwarded-for}\' \'%{NOTSPACE:X-Widi-Api}\' \'%{DATA:agent}\' \'%{NOTSPACE:UUID}\' \'%{NOTSPACE:X-Forwarded-Client-Cert}\' \'%{NOTSPACE:authority}\' \'%{NOTSPACE:upstream_host}\'$' }
            }
        mutate {
          convert => {
                "http_status"    => "integer"
                "bytes_sent"     => "integer"
                "bytes_received" => "integer"
                "request_time"   => "integer"
                }
        }
        useragent {
          source => "agent"
        }
        date {
          match => [ "time", "ISO8601" ]
          remove_field => [ "time" ]
        }
        mutate {
           add_field => { 'type' => 'ambassador'}
        }
        fingerprint {
          source => ["message","offset"]
          target => "[@metadata][fingerprint]"
          key => "OUR-LOG-KEY-123"
          base64encode => true
          method => "SHA256"
          concatenate_sources => true
        }
        if "_grokparsefailure" not in [tags] {
          mutate {
            remove_field => [ "message" ]
          }
        }
      }
      }
      else {

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.