Pipeline does not work

Help understand why it does not work pipeline

pipelines.yml

- pipeline.id: main
  path.config: "/usr/share/logstash/config/conf.d/*.conf"

- pipeline.id: ms001-backup01
  path.config: "/usr/share/logstash/config/conf.d.backup01/*.conf"

- pipeline.id: ms001-backup02
  path.config: "/usr/share/logstash/config/conf.d.backup02/*.conf"

- pipeline.id: ms001-backup03
  path.config: "/usr/share/logstash/config/conf.d.backup03/*.conf"

logstash/settings/conf.d/010-beats-input.conf

input {
  beats {
    port => 5044
  }
}

output {


    if [type] == ms001-back1
        {
        pipeline { send_to => [ "ms001-backup01" ] }
    }


    else if [type] == ms001-back2
    {
        pipeline { send_to => [ "ms001-backup02" ] }
    }
    

}
/logstash/settings/conf.d.backup01/010-beats-input.conf
input { pipeline { address => "ms001-backup01" } }

/logstash/settings/conf.d.backup02/010-beats-input.conf
input { pipeline { address => "ms001-backup02" } }
/logstash/settings/conf.d.backup01/220-elk-output.conf
output {
 elasticsearch {
        hosts=> ["10.0.18.88:9200"]
        index=> "ms001-backup01-%{[@metadata][beat]}%-{+YYYY.MM.dd}"
        user=> "elastic"
        password=> "changeme"

      }
}
/logstash/settings/conf.d.backup02/220-elk-output.conf

output {
 elasticsearch {
        hosts=> ["10.0.18.88:9200"]
        index=> "ms001-backup02-%{+YYYY.MM.dd}"
        user=> "elastic"
        password=> "changeme"

      }
}

logstash.log


logstash         | WARNING: An illegal reflective access operation has occurred
logstash         | WARNING: Illegal reflective access by com.headius.backport9.modules.Modules (file:/usr/share/logstash/logstash-core/lib/jars/jruby-complete-9.2.11.1.jar) to method sun.nio.ch.NativeThread.signal(long)
logstash         | WARNING: Please consider reporting this to the maintainers of com.headius.backport9.modules.Modules
logstash         | WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
logstash         | WARNING: All illegal access operations will be denied in a future release
logstash         | Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
logstash         | [INFO ] 2020-06-22 19:29:12.764 [main] writabledirectory - Creating directory {:setting=>"path.queue", :path=>"/usr/share/logstash/data/queue"}
logstash         | [INFO ] 2020-06-22 19:29:12.771 [main] writabledirectory - Creating directory {:setting=>"path.dead_letter_queue", :path=>"/usr/share/logstash/data/dead_letter_queue"}
logstash         | [INFO ] 2020-06-22 19:29:13.115 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"7.8.0", "jruby.version"=>"jruby 9.2.11.1 (2.5.7) 2020-03-25 b1f55b1a40 OpenJDK 64-Bit Server VM 11.0.7+10-LTS on 11.0.7+10-LTS +jit [linux-x86_64]"}
logstash         | [INFO ] 2020-06-22 19:29:13.121 [LogStash::Runner] agent - No persistent UUID file found. Generating new UUID {:uuid=>"ff420293-5481-4e60-a813-e9fe2028ebc0", :path=>"/usr/share/logstash/data/uuid"}
logstash         | [WARN ] 2020-06-22 19:29:13.228 [LogStash::Runner] pipelineregisterhook - Internal collectors option for Logstash monitoring is deprecated and targeted for removal in the next major version.
logstash         | Please configure Metricbeat to monitor Logstash. Documentation can be found at: 
logstash         | https://www.elastic.co/guide/en/logstash/current/monitoring-with-metricbeat.html
logstash         | [INFO ] 2020-06-22 19:29:13.889 [LogStash::Runner] licensereader - Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@elasticsearch:9200/]}}
logstash         | [WARN ] 2020-06-22 19:29:14.228 [LogStash::Runner] licensereader - Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://elastic:xxxxxx@elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://elastic:xxxxxx@elasticsearch:9200/][Manticore::SocketException] Connection refused (Connection refused)"}
logstash         | [WARN ] 2020-06-22 19:29:14.238 [LogStash::Runner] licensereader - Marking url as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] Elasticsearch Unreachable: [http://elastic:xxxxxx@elasticsearch:9200/][Manticore::SocketException] Connection refused (Connection refused) {:url=>http://elastic:xxxxxx@elasticsearch:9200/, :error_message=>"Elasticsearch Unreachable: [http://elastic:xxxxxx@elasticsearch:9200/][Manticore::SocketException] Connection refused (Connection refused)", :error_class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError"}
logstash         | [ERROR] 2020-06-22 19:29:14.239 [LogStash::Runner] licensereader - Unable to retrieve license information from license server {:message=>"Elasticsearch Unreachable: [http://elastic:xxxxxx@elasticsearch:9200/][Manticore::SocketException] Connection refused (Connection refused)"}
logstash         | [ERROR] 2020-06-22 19:29:14.246 [LogStash::Runner] internalpipelinesource - Failed to fetch X-Pack information from Elasticsearch. This is likely due to failure to reach a live Elasticsearch cluster.
logstash         | [INFO ] 2020-06-22 19:29:16.458 [Converge PipelineAction::Create<main>] Reflections - Reflections took 574 ms to scan 1 urls, producing 21 keys and 41 values 
logstash         | [INFO ] 2020-06-22 19:29:17.414 [[main]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["/usr/share/logstash/config/conf.d/010-beats-input.conf"], :thread=>"#<Thread:0x3017cf89@/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:121 run>"}
logstash         | [INFO ] 2020-06-22 19:29:18.558 [[ms001-backup02]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"ms001-backup02", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["/usr/share/logstash/config/conf.d.backup02/010-beats-input.conf", "/usr/share/logstash/config/conf.d.backup02/100-filter.conf", "/usr/share/logstash/config/conf.d.backup02/220-elk-output.conf"], :thread=>"#<Thread:0x5ecdb57c@/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:121 run>"}
logstash         | [INFO ] 2020-06-22 19:29:18.655 [[ms001-backup01]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"ms001-backup01", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["/usr/share/logstash/config/conf.d.backup01/010-beats-input.conf", "/usr/share/logstash/config/conf.d.backup01/100-filter.conf", "/usr/share/logstash/config/conf.d.backup01/220-elk-output.conf"], :thread=>"#<Thread:0x79e0e08e@/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:121 run>"}
logstash         | [INFO ] 2020-06-22 19:29:20.803 [[ms001-backup01]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"ms001-backup01"}
logstash         | [INFO ] 2020-06-22 19:29:20.834 [[ms001-backup02]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"ms001-backup02"}
logstash         | [INFO ] 2020-06-22 19:29:20.949 [[main]-pipeline-manager] beats - Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
logstash         | [INFO ] 2020-06-22 19:29:20.978 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
logstash         | [INFO ] 2020-06-22 19:29:21.001 [Agent thread] agent - Pipelines running {:count=>3, :running_pipelines=>[:"ms001-backup01", :"ms001-backup02", :main], :non_running_pipelines=>[]}
logstash         | [INFO ] 2020-06-22 19:29:21.158 [[main]<beats] Server - Starting server on port: 5044
logstash         | [INFO ] 2020-06-22 19:29:21.265 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
logstash         | [ERROR] 2020-06-22 19:29:44.245 [monitoring-license-manager] licensereader - Unable to retrieve license information from license server {:message=>"No Available connections"}
logstash         | [WARN ] 2020-06-22 19:29:45.316 [Ruby-0-Thread-2: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.5.1-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:241] licensereader - Restored connection to ES instance {:url=>"http://elastic:xxxxxx@elasticsearch:9200/"}
logstash         | [INFO ] 2020-06-22 19:29:46.918 [Ruby-0-Thread-2: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.5.1-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:241] licensereader - ES Output version determined {:es_version=>7}
logstash         | [WARN ] 2020-06-22 19:29:46.919 [Ruby-0-Thread-2: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.5.1-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:241] licensereader - Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
logstash         | [INFO ] 2020-06-22 19:30:14.291 [monitoring-license-manager] internalpipelinesource - Monitoring License OK
logstash         | [INFO ] 2020-06-22 19:30:14.291 [monitoring-license-manager] internalpipelinesource - Validated license for monitoring. Enabling monitoring pipeline.
logstash         | [INFO ] 2020-06-22 19:30:14.578 [[.monitoring-logstash]-pipeline-manager] elasticsearchmonitoring - Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@elasticsearch:9200/]}}
logstash         | [WARN ] 2020-06-22 19:30:14.591 [[.monitoring-logstash]-pipeline-manager] elasticsearchmonitoring - Restored connection to ES instance {:url=>"http://elastic:xxxxxx@elasticsearch:9200/"}
logstash         | [INFO ] 2020-06-22 19:30:14.598 [[.monitoring-logstash]-pipeline-manager] elasticsearchmonitoring - ES Output version determined {:es_version=>7}
logstash         | [WARN ] 2020-06-22 19:30:14.599 [[.monitoring-logstash]-pipeline-manager] elasticsearchmonitoring - Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
logstash         | [INFO ] 2020-06-22 19:30:14.636 [[.monitoring-logstash]-pipeline-manager] elasticsearchmonitoring - New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearchMonitoring", :hosts=>["http://elasticsearch:9200"]}
logstash         | [WARN ] 2020-06-22 19:30:14.636 [[.monitoring-logstash]-pipeline-manager] javapipeline - 'pipeline.ordered' is enabled and is likely less efficient, consider disabling if preserving event order is not necessary
logstash         | [INFO ] 2020-06-22 19:30:14.639 [[.monitoring-logstash]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>".monitoring-logstash", "pipeline.workers"=>1, "pipeline.batch.size"=>2, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2, "pipeline.sources"=>["monitoring pipeline"], :thread=>"#<Thread:0x510e863a@/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:121 run>"}
logstash         | [INFO ] 2020-06-22 19:30:14.689 [[.monitoring-logstash]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>".monitoring-logstash"}
logstash         | [INFO ] 2020-06-22 19:30:14.700 [monitoring-license-manager] agent - Pipelines running {:count=>4, :running_pipelines=>[:"ms001-backup01", :".monitoring-logstash", :"ms001-backup02", :main], :non_running_pipelines=>[]}



Why doesn’t work, where did I go wrong?

What is in the logstash log? Which pipelines does it say are running?

Attached a log

OK, so the pipelines are running. I am actually surprised that

if [type] == ms001-back1

does not generate an exception. ms001-back1 should be in double quotes.

Maybe [type] does not have the value you expect. Can you add

 output { stdout { codec => rubydebug } }

to the "main" pipeline and see what an event looks like? Are you sure [type] is a top level field?

/opt/containers/elk/logstash/settings/conf.d/010-beats-input.conf

input {
  beats {
    port => 5044
  }
}



output { stdout { codec => rubydebug } }
 "version" => "1.5.0"
         },
         "@version" => "1",
         "agent" => {
         "hostname" => "ms001-backup02.nes.lan",
         "id" => "882aff82-f848-4c33-b28d-76353188573b",
         "ephemeral_id" => "0d81d53b-0a34-4064-9e11-f5e2ddc28505",
         "version" => "7.7.1",
         "type" => "filebeat"
         },
         "input" => {
         "type" => "log"
         },
         "host" => {
         "name" => "ms001-backup02.nes.lan"
         },
         "fields" => {
         "type" => "ms001-backup02"
         },
         "@timestamp" => 2020-06-22T22:25:49.202Z,
         "message" => "  SD termination status:  OK"
         }
         {
        "tags" => [
         [0] "beats_input_codec_plain_applied"
         ],
         "log" => {
         "offset" => 76832,
         "file" => {
         "path" => "/var/log/bareos/bareos.log"
          }
          },
          "ecs" => {
          "version" => "1.5.0"
          },
          "@version" => "1",
          "agent" => {
          "type" => "filebeat",
           "id" => "882aff82-f848-4c33-b28d-76353188573b",
           "ephemeral_id" => "0d81d53b-0a34-4064-9e11-f5e2ddc28505",
          "version" => "7.7.1",
          "hostname" => "ms001-backup02.nes.lan"
          },
          "input" => {
          "type" => "log"
          },
          "host" => {
          "name" => "ms001-backup02.nes.lan"
          },
          "fields" => {
          "type" => "ms001-backup02"
          },
          "@timestamp" => 2020-06-22T22:25:49.202Z,
           "message" => "  Bareos binary info:     bareos.org build: Get official binaries and vendor support on bareos.com"
          }
          {
         "tags" => [
          [0] "beats_input_codec_plain_applied"
          ],
          "log" => {
          "offset" => 76774,
          "file" => {
           "path" => "/var/log/bareos/bareos.log"
          }
          },
          "ecs" => {
          "version" => "1.5.0"
          },
         "@version" => "1",
         "agent" => {
         "hostname" => "ms001-backup02.nes.lan"
		 "id" => "882aff82-f848-4c33-b28d-76353188573b",
         "ephemeral_id" => "0d81d53b-0a34-4064-9e11-f5e2ddc28505",
               "version" => "7.7.1",
                 "type" => "filebeat
	  "input" => {
      "type" => "log"
    },
           "host" => {
"name" => "ms001-backup02.nes.lan"
},
  "fields" => {
"type" => "ms001-backup02"
},
"@timestamp" => 2020-06-22T22:25:49.202Z,
"message" => "  FD termination status:  OK"

OK, the field you need to test is [fields][type], not [type]. You may have the option of setting the fields_under_root option to add the field as [type] rather than [fields][type], but I do not know enough about your use case to say whether that would work.

Can you try this?...

    if [fields][type] == "ms001-back1"
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.