All pipelines shutdown after one is not working

i have 2 pipelines one is for kafka and other one is for http when kafka is not working and logstash can not create connection to kafka it terminates http pipeline too i asked to chat gpt it said
Run each pipeline as a separate Logstash instance or in separate Logstash containers. This way, if one pipeline fails, it won't affect the others since they are running independently
but i do not like this solution
is there any way to solve this i searched through discuss but can not find anything.

This is my logostash log after this logstash shutsdown and restarts

logs in ss not looking good enough so here are the logs thank you guys

2023-10-09 15:18:49 [2023-10-09T12:18:49,583][INFO ][logstash.javapipeline    ][http] Pipeline started {"pipeline.id"=>"http"}
2023-10-09 15:18:49 [2023-10-09T12:18:49,585][INFO ][logstash.inputs.http     ][http][4c5283db339dddc365c0a40435a5a0ee2576b7d7cf43baf9b807df506847db91] Starting http input listener {:address=>"0.0.0.0:8080", :ssl=>"false"}
2023-10-09 15:18:49 [2023-10-09T12:18:49,597][INFO ][logstash.agent           ] Pipelines running {:count=>2, :running_pipelines=>[:http, :kafka], :non_running_pipelines=>[]}
2023-10-09 15:18:57 [2023-10-09T12:18:57,528][WARN ][org.apache.kafka.clients.ClientUtils][kafka][024643a58b325b77c30bc446938004c85e81dd318085326db53c1d84d3b52c4a] Couldn't resolve server kafka:29092 from bootstrap.servers as DNS resolution failed for kafka
2023-10-09 15:18:57 [2023-10-09T12:18:57,528][INFO ][org.apache.kafka.common.metrics.Metrics][kafka][024643a58b325b77c30bc446938004c85e81dd318085326db53c1d84d3b52c4a] Metrics scheduler closed
2023-10-09 15:18:57 [2023-10-09T12:18:57,528][INFO ][org.apache.kafka.common.metrics.Metrics][kafka][024643a58b325b77c30bc446938004c85e81dd318085326db53c1d84d3b52c4a] Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-10-09 15:18:57 [2023-10-09T12:18:57,528][INFO ][org.apache.kafka.common.metrics.Metrics][kafka][024643a58b325b77c30bc446938004c85e81dd318085326db53c1d84d3b52c4a] Metrics reporters closed
2023-10-09 15:18:57 [2023-10-09T12:18:57,531][INFO ][org.apache.kafka.common.utils.AppInfoParser][kafka][024643a58b325b77c30bc446938004c85e81dd318085326db53c1d84d3b52c4a] App info kafka.consumer for logstash-0 unregistered
2023-10-09 15:18:57 [2023-10-09T12:18:57,533][ERROR][logstash.inputs.kafka    ][kafka][024643a58b325b77c30bc446938004c85e81dd318085326db53c1d84d3b52c4a] Unable to create Kafka consumer from given configuration {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka consumer>, :cause=>#<Java::OrgApacheKafkaCommonConfig::ConfigException: No resolvable bootstrap urls given in bootstrap.servers>}
2023-10-09 15:18:57 warning: thread "[kafka]<kafka" terminated with exception (report_on_exception is true):
2023-10-09 15:18:57 ArgumentError: wrong number of arguments (given 2, expected 0..1)
2023-10-09 15:18:57     translate at /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/i18n-1.14.1/lib/i18n.rb:210
2023-10-09 15:18:57   inputworker at /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:427
2023-10-09 15:18:57   start_input at /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:405
2023-10-09 15:18:57 [2023-10-09T12:18:57,703][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<ArgumentError: wrong number of arguments (given 2, expected 0..1)>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/i18n-1.14.1/lib/i18n.rb:210:in `translate'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:427:in `inputworker'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:405:in `block in start_input'"]}
2023-10-09 15:18:57 [2023-10-09T12:18:57,747][INFO ][logstash.javapipeline    ][kafka] Pipeline terminated {"pipeline.id"=>"kafka"}
2023-10-09 15:18:57 [2023-10-09T12:18:57,761][INFO ][logstash.javapipeline    ][http] Pipeline terminated {"pipeline.id"=>"http"}

Hello,

Please avoid sharing screenshots, they are hard to read sometime and is not possible to copy to quote and comment on something or to replicate something, always share log lines and configurations using the Preformatted text option, the </> button.

Check your logs, you have a configuration error in one of your pipelines, look at the FATAL log line.

Logstash won't start if one of the configuration has any errors.

You need to share your pipeline configurations, something is not correct.

i actually did share it as a reply but the system removed it i think it was catched as spam whatever here are my log
this is my pipelines.yml the reason i used config.string in kafka pipeline somehow the logstash did not read the external kafka.conf file so i had to use it this way

- pipeline.id: http
  path.config: "pipeline/logstash.conf"
  pipeline.batch.size: 50
  pipeline.batch.delay: 20

- pipeline.id: kafka
  # path.config: "pipeline/kafka.conf"
  config.string: |
    input {
    kafka {
      bootstrap_servers => "kafka:29092"
      topics => ["kafka-activity-log"]
      codec => json
    } 
    }

    filter{
      json {
        source => "message"
        target => "parsed_json"
      }


    if "_jsonparsefailure" not in [tags] {
      mutate {
        remove_field => ["message", "@version","host", "url", "event", "user_agent"]
      }

      ruby {
        code => '
          empty_fields = 0
          ["user_id", "service_name", "activity_type"].each do |field|
            if event.get(field).to_s.empty?
              empty_fields += 1
            end
          end

          if empty_fields >= 2
            event.set("[@metadata][index]", "user_activity_log_missing_data")
          elsif event.get("[user_id]").to_s.empty?
            event.set("[@metadata][index]", "user_activity_log_empty_user_id")
          elsif event.get("[service_name]").to_s.empty?
            event.set("[@metadata][index]", "user_activity_log_empty_service_name")
          elsif event.get("[activity_type]").to_s.empty?
            event.set("[@metadata][index]", "user_activity_log_empty_activity_type")
          elsif event.get("[error][message]").to_s.empty?
            event.set("[@metadata][index]", "user_activity_log_empty_error_message")
          elsif event.get("[error][component_id]").to_s.empty?
            event.set("[@metadata][index]", "user_activity_log_empty_component_id")
          else
            event.set("[@metadata][index]", "user_activity_log")
          end
        '
      }
    } else {
      mutate{
        add_tag => ["_jsonparsefailure", "_kafkaexceptionfailure"]
      }
      ruby {
        code => '
          
          event.set("[@metadata][index]", "user_activity_log_invalid_data")
        '
      }
    }

      if "_kafkaexceptionfailure" in [tags] {
    kafka {
      bootstrap_servers => "kafka:29092"
      codec => json
      topic_id => "dlq-topic"
    }
    drop {} # Drop the original event to prevent it from being processed further
    }
    }

    output {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      user => "elastic"
      password => "${ELASTIC_PASSWORD}"
      index => "%{[@metadata][index]}"
      manage_template => true
      action => "index"
    }

    stdout { codec => rubydebug }
    }

  pipeline.batch.size: 30
  pipeline.batch.delay: 20

after adding this parts to config string it looks like working but idk

 mutate{
        add_tag => ["_jsonparsefailure", "_kafkaexceptionfailure"]
      }
 if "_kafkaexceptionfailure" in [tags] {
    kafka {
      bootstrap_servers => "kafka:29092"
      codec => json
      topic_id => "dlq-topic"
    }
    drop {} # Drop the original event to prevent it from being processed further
    }
    }

You should try to fix this first, if the path and permissions are correct Logstash will read the file, use the full path in pipelines.yml, not the relative path, you are using the relative path.

I see no issue in your input, it seems that the FATAL error is generate because your Logstash cannot connect to your Kafka Brokers as you can see in the WARN and ERROR lines in the log you shared.

If Logstash cannot connect to your Kafka Brokers it will not start as well.

What does your advertised_listeners look like in your kafka configuration?

I don't think so, your pipeline is probably not even arriving at this point, also you have another issue.

You have a kafka directive in your filter block, there is no kafka filter, you need to remove it.

Remove this, there is no kafka filter, you can have a kafka plugin only on the input or output.

first of all appreciate for your response.
I changed the pipeline as you said and it worked! it used to not work for real.
this is my pipelines.yml file rn

- pipeline.id: http
  path.config: "pipeline/logstash.conf"
  pipeline.batch.size: 50
  pipeline.batch.delay: 20

- pipeline.id: kafka
  path.config: "pipeline/kafka.conf"
  pipeline.batch.size: 30
  pipeline.batch.delay: 20

the log says pipelines and conf works

2023-10-09 16:43:18 [2023-10-09T13:43:18,576][INFO ][logstash.javapipeline    ][http] Pipeline started {"pipeline.id"=>"http"}
2023-10-09 16:43:18 [2023-10-09T13:43:18,578][INFO ][logstash.inputs.http     ][http][4c5283db339dddc365c0a40435a5a0ee2576b7d7cf43baf9b807df506847db91] Starting http input listener {:address=>"0.0.0.0:8080", :ssl=>"false"}
2023-10-09 16:43:18 [2023-10-09T13:43:18,590][INFO ][logstash.agent           ] Pipelines running {:count=>2, :running_pipelines=>[:http, :kafka], :non_running_pipelines=>[]}
2023-10-09 16:43:26 [2023-10-09T13:43:26,528][WARN ][org.apache.kafka.clients.ClientUtils][kafka][7cb9977aa7dcf217b367ddef40559cfeb515482dc4b00bec4f923a2bf7096339] Couldn't resolve server kafka:29092 from bootstrap.servers as DNS resolution failed for kafka

and as you see after that warn logstash throws an error saying can not create kafka consumer.
and the same error again

2023-10-09 16:43:26 [2023-10-09T13:43:26,658][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<ArgumentError: wrong number of arguments (given 2, expected 0..1)>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/i18n-1.14.1/lib/i18n.rb:210:in `translate'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:427:in `inputworker'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:405:in `block in start_input'"]}
2023-10-09 16:43:26 [2023-10-09T13:43:26,666][ERROR][logstash.agent           ] Exception handling servers {:error=>#<IOError: closed stream>, :req=>nil}
2023-10-09 16:43:26 [2023-10-09T13:43:26,835][INFO ][logstash.javapipeline    ][kafka] Pipeline terminated {"pipeline.id"=>"kafka"}
2023-10-09 16:43:26 [2023-10-09T13:43:26,835][INFO ][logstash.javapipeline    ][http] Pipeline terminated {"pipeline.id"=>"http"}

again terminates everything

btw this is kafka.conf

input {
  kafka {
    bootstrap_servers => "kafka:29092"
    topics => ["kafka-activity-log"]
    codec => json {
      target => "document" # Parse the JSON into the [document] field
    }
  }

}

filter {
  json {
    source => "message"
    target => "parsed_json"
  }

  if "_jsonparsefailure" not in [tags] {
    mutate {
      remove_field => ["message", "@version",  "event", "user_agent"]
    }

    ruby {
      code => '
        empty_fields = 0
        ["user_id", "service_name", "activity_type", "error"].each do |field|
          if event.get(field).to_s.empty?
            empty_fields += 1
          end
        end

        if empty_fields >= 2
          event.set("[@metadata][index]", "user_activity_log_missing_data")
        elsif event.get("[user_id]").to_s.empty?
          event.set("[@metadata][index]", "user_activity_log_empty_user_id")
        elsif event.get("[service_name]").to_s.empty?
          event.set("[@metadata][index]", "user_activity_log_empty_service_name")
        elsif event.get("[activity_type]").to_s.empty?
          event.set("[@metadata][index]", "user_activity_log_empty_activity_type")
        elsif event.get("[error.message]").to_s.empty?
          event.set("[@metadata][index]", "user_activity_log_empty_error_message")
        else
          event.set("[@metadata][index]", "user_activity_log")
        end
      '
    }
  } else {
    ruby {
      code => '
        event.set("[@metadata][index]", "user_activity_log_invalid_data")
      '
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    user => "elastic"
    password => "${ELASTIC_PASSWORD}"
    index => "%{[@metadata][index]}"
    manage_template => true
    action => "index"
  }

  stdout { codec => rubydebug }
}

is this logstashs typical behaviour. it terminates other pipelines if any pipeline is not working?

again thank you for your time

As mentioned before, your Logstash is not able to connect to your Kafka Brokers, if it cannot connect to the Kafka broker while starting, it will fail, you need to solve this connection or else it will not run.

If it cannot connect to the Kafka broker it will not able to create a Kafka Consumer and the you will have the kafka exception.

Did you check this?

What does your advertised_listeners look like in your kafka configuration?

In some cases yes, an error in a pipeline can impact the Logstash instance running those pipelines, but your issue is different, Logstash needs that all pipeline works while starting, if one of them does not work while starting, the instance will crash.

point is this i intentionally do not start the kafka to see if logstash will work if there is an error with some pipeline. There is nothing wrong with kafka nor logstash when they both are working and connecting
as i said before chatgpt said this:
Run each pipeline as a separate Logstash instance or in separate Logstash containers. This way, if one pipeline fails, it won't affect the others since they are running independently
And you said this too:
Logstash needs that all pipeline works while starting, if one of them does not work while starting, the instance will crash.

So solution in here is to create instance for each logstash pipeline am i right?

It depends of what you want, using multiple pipelines allows you to use run different pipelines on the same logstash instance, the events of those pipelines are completely independent from each other, but in some cases an error may impact the logstash instance, mostly on starting.

Your issue is that Logstash is failing to start because a configuration error that you now say is intentional.

I'm not entirely sure what is the only issue as you had other errors in your pipeline, can you use the correct configuration to see if Logstash starts? I

Also, I would not trust ChatGPT for anything related to Elastic Stack without checking the documentation first, there are a couple of examples on the forum of completely made-up things.

But in this case if you want to completely separate it then yes, you would need independent instances.

when i run kafka and logstash there is no problem with current configs everything is fine
but when i intentionally not start kafka. the kafka pipeline terminates therefore the http pipeline and so on.

and additionally i tested that after everything starts correctly if i stop/kill kafka instance logstash throws error and warn after some time it crushes and restarts

and you're right gpt does not help at all i will try to run each pipeline with new instance

so far i can not thank you enough you've helped me a lot take care of yourself i will mark your answer as solution.

It looks like you had an issue with sharing your configuration in a reply due to the system flagging it as potential spam. Your pipelines.yml setup for Logstash seems well-detailed and configured to handle Kafka input and Elasticsearch output effectively. Using config.string in the Kafka pipeline is a valid workaround if the external Kafka configuration file wasn't being read correctly. Make sure to review and test your Logstash configuration to ensure it functions as intended. AC Football Cases

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.