first of all appreciate for your response.
I changed the pipeline as you said and it worked! it used to not work for real.
this is my pipelines.yml file rn
- pipeline.id: http
path.config: "pipeline/logstash.conf"
pipeline.batch.size: 50
pipeline.batch.delay: 20
- pipeline.id: kafka
path.config: "pipeline/kafka.conf"
pipeline.batch.size: 30
pipeline.batch.delay: 20
the log says pipelines and conf works
2023-10-09 16:43:18 [2023-10-09T13:43:18,576][INFO ][logstash.javapipeline ][http] Pipeline started {"pipeline.id"=>"http"}
2023-10-09 16:43:18 [2023-10-09T13:43:18,578][INFO ][logstash.inputs.http ][http][4c5283db339dddc365c0a40435a5a0ee2576b7d7cf43baf9b807df506847db91] Starting http input listener {:address=>"0.0.0.0:8080", :ssl=>"false"}
2023-10-09 16:43:18 [2023-10-09T13:43:18,590][INFO ][logstash.agent ] Pipelines running {:count=>2, :running_pipelines=>[:http, :kafka], :non_running_pipelines=>[]}
2023-10-09 16:43:26 [2023-10-09T13:43:26,528][WARN ][org.apache.kafka.clients.ClientUtils][kafka][7cb9977aa7dcf217b367ddef40559cfeb515482dc4b00bec4f923a2bf7096339] Couldn't resolve server kafka:29092 from bootstrap.servers as DNS resolution failed for kafka
and as you see after that warn logstash throws an error saying can not create kafka consumer.
and the same error again
2023-10-09 16:43:26 [2023-10-09T13:43:26,658][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<ArgumentError: wrong number of arguments (given 2, expected 0..1)>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/i18n-1.14.1/lib/i18n.rb:210:in `translate'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:427:in `inputworker'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:405:in `block in start_input'"]}
2023-10-09 16:43:26 [2023-10-09T13:43:26,666][ERROR][logstash.agent ] Exception handling servers {:error=>#<IOError: closed stream>, :req=>nil}
2023-10-09 16:43:26 [2023-10-09T13:43:26,835][INFO ][logstash.javapipeline ][kafka] Pipeline terminated {"pipeline.id"=>"kafka"}
2023-10-09 16:43:26 [2023-10-09T13:43:26,835][INFO ][logstash.javapipeline ][http] Pipeline terminated {"pipeline.id"=>"http"}
again terminates everything
btw this is kafka.conf
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => ["kafka-activity-log"]
codec => json {
target => "document" # Parse the JSON into the [document] field
}
}
}
filter {
json {
source => "message"
target => "parsed_json"
}
if "_jsonparsefailure" not in [tags] {
mutate {
remove_field => ["message", "@version", "event", "user_agent"]
}
ruby {
code => '
empty_fields = 0
["user_id", "service_name", "activity_type", "error"].each do |field|
if event.get(field).to_s.empty?
empty_fields += 1
end
end
if empty_fields >= 2
event.set("[@metadata][index]", "user_activity_log_missing_data")
elsif event.get("[user_id]").to_s.empty?
event.set("[@metadata][index]", "user_activity_log_empty_user_id")
elsif event.get("[service_name]").to_s.empty?
event.set("[@metadata][index]", "user_activity_log_empty_service_name")
elsif event.get("[activity_type]").to_s.empty?
event.set("[@metadata][index]", "user_activity_log_empty_activity_type")
elsif event.get("[error.message]").to_s.empty?
event.set("[@metadata][index]", "user_activity_log_empty_error_message")
else
event.set("[@metadata][index]", "user_activity_log")
end
'
}
} else {
ruby {
code => '
event.set("[@metadata][index]", "user_activity_log_invalid_data")
'
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "elastic"
password => "${ELASTIC_PASSWORD}"
index => "%{[@metadata][index]}"
manage_template => true
action => "index"
}
stdout { codec => rubydebug }
}
is this logstashs typical behaviour. it terminates other pipelines if any pipeline is not working?