Pipeline does not exist

Hi,

I'm sending logs from filebeat to logstash but it I'm getting this error from logstash connecting to elastisearch:

Jul 01 14:26:49 elk logstash[5445]: [2020-07-01T14:26:49,288][WARN ][logstash.outputs.elasticsearch][main][64ad1b33ef35eebfd73d0629e2448a7437bbb704ce2d638bab05f9732e6860e7] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"filebeat-7.8.0-2020.07.01", :routing=>nil, :_type=>"_doc", :pipeline=>"filebeat-7.8.0-system-syslog-pipeline"}, #<LogStash::Event:0x1f787851>], :response=>{"index"=>{"_index"=>"filebeat-7.8.0-2020.07.01", "_type"=>"_doc", "_id"=>nil, "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"pipeline with id [filebeat-7.8.0-system-syslog-pipeline] does not exist"}}}}

Here is the Logstash config file:

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5046
    ssl => true
    ssl_certificate_authorities => ["/etc/elasticsearch/ipnew-client-ca.crt"]
    ssl_certificate => "/etc/elasticsearch/ipnew-client.crt"
    ssl_key => "/etc/elasticsearch/ipnew-client.key"
    ssl_verify_mode => "force_peer"

  }
}

output {
  if [@metadata][pipeline] {
    elasticsearch {
     hosts => "https://192.168.2.220:9200"
     manage_template => false
     index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
     pipeline => "%{[@metadata][pipeline]}"
     user => "elastic"
     password => "xxx"
     ssl => true
     ssl_certificate_verification => true
     
     cacert => '/etc/kibana/elasticsearch-ca.pem'
    }
  } else {
    elasticsearch {
      hosts => "https://192.168.2.220:9200"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
      user => "elastic"
      password => "xxx"
      ssl => true
      ssl_certificate_verification => true
      
      cacert => '/etc/kibana/elasticsearch-ca.pem'
    }
  }
}

Any help on this?

Regards,

Do you see your ingest pipeline (filebeat-7.8.0-system-syslog-pipeline) if you query GET _ingest/pipeline on your ES cluster? If not, there was probably an error when you tried to set it up.

(I've never used that feature. Just trying to help based on the docs.)

No it is not there, here are the results.

{
  "filebeat-7.7.0-system-auth-pipeline" : {
    "processors" : [
      {
        "grok" : {
          "field" : "message",
          "ignore_missing" : true,
          "pattern_definitions" : {
            "GREEDYMULTILINE" : """(.|
)*""",
            "TIMESTAMP" : "(?:%{TIMESTAMP_ISO8601}|%{SYSLOGTIMESTAMP})"
          },
          "patterns" : [
            "%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\\[%{POSINT:process.pid:long}\\])?: %{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user )?%{DATA:user.name} from %{IPORHOST:source.ip} port %{NUMBER:source.port:long} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?",
            "%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\\[%{POSINT:process.pid:long}\\])?: %{DATA:system.auth.ssh.event} user %{DATA:user.name} from %{IPORHOST:source.ip}",
            "%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\\[%{POSINT:process.pid:long}\\])?: Did not receive identification string from %{IPORHOST:system.auth.ssh.dropped_ip}",
            "%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\\[%{POSINT:process.pid:long}\\])?: \\s*%{DATA:user.name} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}",
            "%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\\[%{POSINT:process.pid:long}\\])?: new group: name=%{DATA:group.name}, GID=%{NUMBER:group.id}",
            "%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\\[%{POSINT:process.pid:long}\\])?: new user: name=%{DATA:user.name}, UID=%{NUMBER:user.id}, GID=%{NUMBER:group.id}, home=%{DATA:system.auth.useradd.home}, shell=%{DATA:system.auth.useradd.shell}$",
            "%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname}? %{DATA:process.name}(?:\\[%{POSINT:process.pid:long}\\])?: %{GREEDYMULTILINE:system.auth.message}"
          ]
        }
      },
      {
        "remove" : {
          "field" : "message"
        }
      },
      {
        "rename" : {
          "field" : "system.auth.message",
          "target_field" : "message",
          "ignore_missing" : true
        }
      },
      {
        "set" : {
          "field" : "source.ip",
          "value" : "{{system.auth.ssh.dropped_ip}}",
          "if" : "ctx.containsKey('system') && ctx.system.containsKey('auth') && ctx.system.auth.containsKey('ssh') && ctx.system.auth.ssh.containsKey('dropped_ip')"
        }
      },
      {
        "date" : {
          "if" : "ctx.event.timezone == null",
          "field" : "system.auth.timestamp",
          "target_field" : "@timestamp",
          "formats" : [
            "MMM  d HH:mm:ss",
            "MMM dd HH:mm:ss",
            "ISO8601"
          ],
          "on_failure" : [
            {
              "append" : {
                "field" : "error.message",
                "value" : "{{ _ingest.on_failure_message }}"
              }
            }
          ]
        }
      },
      {
        "date" : {
          "timezone" : "{{ event.timezone }}",
          "on_failure" : [
            {
              "append" : {
                "field" : "error.message",
                "value" : "{{ _ingest.on_failure_message }}"
              }
            }
          ],
          "if" : "ctx.event.timezone != null",
          "field" : "system.auth.timestamp",
          "target_field" : "@timestamp",
          "formats" : [
            "MMM  d HH:mm:ss",
            "MMM dd HH:mm:ss",
            "ISO8601"
          ]
        }
      },
      {
        "remove" : {
          "field" : "system.auth.timestamp"
        }
      },
      {
        "geoip" : {
          "ignore_failure" : true,
          "field" : "source.ip",
          "target_field" : "source.geo"
        }
      },
      {
        "geoip" : {
          "database_file" : "GeoLite2-ASN.mmdb",
          "field" : "source.ip",
          "target_field" : "source.as",
          "properties" : [
            "asn",
            "organization_name"
          ],
          "ignore_missing" : true
        }
      },
      {
        "rename" : {
          "field" : "source.as.asn",
          "target_field" : "source.as.number",
          "ignore_missing" : true
        }
      },
      {
        "rename" : {
          "field" : "source.as.organization_name",
          "target_field" : "source.as.organization.name",
          "ignore_missing" : true
        }
      },
      {
        "script" : {
          "ignore_failure" : true,
          "source" : """if (ctx.system.auth.ssh.event == "Accepted") {                  if (!ctx.containsKey("event")) {                    ctx.event = [:];                  }                  ctx.event.type = "authentication_success";                  ctx.event.category = "authentication";                  ctx.event.action = "ssh_login";                  ctx.event.outcome = "success";                } else if (ctx.system.auth.ssh.event == "Invalid" || ctx.system.auth.ssh.event == "Failed") {                  if (!ctx.containsKey("event")) {                    ctx.event = [:];                  }                  ctx.event.type = "authentication_failure";                  ctx.event.category = "authentication";                  ctx.event.action = "ssh_login";                  ctx.event.outcome = "failure";                }""",
          "lang" : "painless"
        }
      }
    ],
    "on_failure" : [
      {
        "set" : {
          "field" : "error.message",
          "value" : "{{ _ingest.on_failure_message }}"
        }
      }
    ],
    "description" : "Pipeline for parsing system authorisation/secure logs"
  },
  "filebeat-7.7.0-system-syslog-pipeline" : {
    "description" : "Pipeline for parsing Syslog messages.",
    "processors" : [
      {
        "grok" : {
          "field" : "message",
          "patterns" : [
            "%{SYSLOGTIMESTAMP:system.syslog.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\\[%{POSINT:process.pid:long}\\])?: %{GREEDYMULTILINE:system.syslog.message}",
            "%{SYSLOGTIMESTAMP:system.syslog.timestamp} %{GREEDYMULTILINE:system.syslog.message}",
            "%{TIMESTAMP_ISO8601:system.syslog.timestamp} %{SYSLOGHOST:host.hostname} %{DATA:process.name}(?:\\[%{POSINT:process.pid:long}\\])?: %{GREEDYMULTILINE:system.syslog.message}"
          ],
          "pattern_definitions" : {
            "GREEDYMULTILINE" : """(.|
)*"""
          },
          "ignore_missing" : true
        }
      },
      {
        "remove" : {
          "field" : "message"
        }
      },
      {
        "rename" : {
          "field" : "system.syslog.message",
          "target_field" : "message",
          "ignore_missing" : true
        }
      },
      {
        "date" : {
          "formats" : [
            "MMM  d HH:mm:ss",
            "MMM dd HH:mm:ss",
            "MMM d HH:mm:ss",
            "ISO8601"
          ],
          "on_failure" : [
            {
              "append" : {
                "field" : "error.message",
                "value" : "{{ _ingest.on_failure_message }}"
              }
            }
          ],
          "if" : "ctx.event.timezone == null",
          "field" : "system.syslog.timestamp",
          "target_field" : "@timestamp"
        }
      },
      {
        "date" : {
          "if" : "ctx.event.timezone != null",
          "field" : "system.syslog.timestamp",
          "target_field" : "@timestamp",
          "formats" : [
            "MMM  d HH:mm:ss",
            "MMM dd HH:mm:ss",
            "MMM d HH:mm:ss",
            "ISO8601"
          ],
          "timezone" : "{{ event.timezone }}",
          "on_failure" : [
            {
              "append" : {
                "field" : "error.message",
                "value" : "{{ _ingest.on_failure_message }}"
              }
            }
          ]
        }
      },
      {
        "remove" : {
          "field" : "system.syslog.timestamp"
        }
      }
    ],
    "on_failure" : [
      {
        "set" : {
          "field" : "error.message",
          "value" : "{{ _ingest.on_failure_message }}"
        }
      }
    ]
  },
  "xpack_monitoring_6" : {
    "description" : "This pipeline upgrades documents from the older version of the Monitoring API to the newer version (7) by fixing breaking changes in those older documents before they are indexed from the older version (6).",
    "version" : 7000199,
    "processors" : [
      {
        "script" : {
          "source" : "ctx._type = null"
        }
      },
      {
        "gsub" : {
          "field" : "_index",
          "pattern" : """(.monitoring-\w+-)6(-.+)""",
          "replacement" : "$17$2"
        }
      }
    ]
  },
  "xpack_monitoring_7" : {
    "description" : "This is a placeholder pipeline for Monitoring API version 7 so that future versions may fix breaking changes.",
    "version" : 7000199,
    "processors" : [ ]
  }
}

any deeper help ?

But you've got filebeat-7.7.0-system-syslog-pipeline which leads me to believe that you are missing an update for ES/ the system module that would include the new pipeline configuration for 7.8 that you are trying to use. Maybe someone else knows more about that.

this is more of a filebeat issue but i believe you will need to run filebeat setup in order to setup the ingest pipelines on ES before using the pipeline directive in logstash.

you have the auth pipelines (which is part of the system module), so my guess is you’re adding syslog to your system module and didn’t run the filebeat setup so the relevant ingest pipelines is not created

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.