Configured logstash multiple pipelines but not working. Need help

Hello,

I am facing an issue with the Logstash. Tried different ways but no luck. Any help is really appreciated. The version that I am using is, 7.3
Already went thru https://www.elastic.co/guide/en/logstash/7.x/multiple-pipelines.html , and other discussion on this.

Basically, I am trying to make different Logstash configs. Initially I created 3 different config file for each, put them into a folder and updated the path.config in the logstash.yml. Noticed that even the 3 ports are listening based on the 3 logstash config files, but when i send multiple requests to all 3 ports only one is really working. Meaning, I don't see anything coming to the other 2 and only one index is updating.
So, tried the pipelines (with and without tags in the config file). Still no luck, and same result. Only one index is updating and nothing coming to the other 2 even the ports are listening.
Captured the tcpdump as well and I see the requests are coming to the correct port.
Note: I cannot change the type here in the config file, because every time it will be syslog.

More details below. Please let me know if I am missing anything here.

[2019-12-10T10:30:04,838][INFO ][logstash.outputs.elasticsearch] Using default mapping template
[2019-12-10T10:30:04,893][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2019-12-10T10:30:04,899][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2019-12-10T10:30:04,903][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2019-12-10T10:30:04,987][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.1-java/vendor/GeoLite2-City.mmdb"}
[2019-12-10T10:30:04,998][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.1-java/vendor/GeoLite2-City.mmdb"}
[2019-12-10T10:30:04,998][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.1-java/vendor/GeoLite2-City.mmdb"}
[2019-12-10T10:30:05,075][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team.
[2019-12-10T10:30:05,077][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team.
[2019-12-10T10:30:05,077][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team.
[2019-12-10T10:30:05,079][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"amproxy", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, :thread=>"#<Thread:0x106bb2ed@/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:38 run>"}
[2019-12-10T10:30:05,079][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"cisproxy", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, :thread=>"#<Thread:0x9b1641f run>"}
[2019-12-10T10:30:05,080][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"beslproxy", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, :thread=>"#<Thread:0x5cf1bdde@/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:37 run>"}
[2019-12-10T10:30:05,337][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"amproxy"}
[2019-12-10T10:30:05,337][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"beslproxy"}
[2019-12-10T10:30:05,344][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"cisproxy"}
[2019-12-10T10:30:05,361][INFO ][logstash.inputs.tcp ] Starting tcp input listener {:address=>"0.0.0.0:5542", :ssl_enable=>"false"}
[2019-12-10T10:30:05,362][INFO ][logstash.inputs.tcp ] Starting tcp input listener {:address=>"0.0.0.0:5544", :ssl_enable=>"false"}
[2019-12-10T10:30:05,389][INFO ][logstash.inputs.tcp ] Starting tcp input listener {:address=>"0.0.0.0:5543", :ssl_enable=>"false"}
[2019-12-10T10:30:05,436][INFO ][logstash.agent ] Pipelines running {:count=>3, :running_pipelines=>[:beslproxy, :amproxy, :cisproxy], :non_running_pipelines=>}
[2019-12-10T10:30:05,631][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

Pipeline Settings in /etc/logstash/logstash.yml

pipeline.id: main

pipeline.workers: 2

pipeline.batch.size: 125

path.config:

/etc/logstash/pipelines.yml

  • pipeline.id: beslproxy
    path.config: "/appdata/ELK/logstash/conf/logstash_BESL.conf"

  • pipeline.id: amproxy
    path.config: "/appdata/ELK/logstash/conf/logstash_AM.conf"

  • pipeline.id: cisproxy
    path.config: "/appdata/ELK/logstash/conf/logstash_CIS.conf"


Here are my logstash conf files (The ownership and permissions are same for all 3 conf file)

logstash_BESL.conf

input {
tcp {
port => 5544
type => syslog
tags => ["BESLPROXY"]
#Therotically this tags really not required, but trying with that too
}

}

filter {
if "BESLPROXY" in [tags] {
grok {
match => [REMOVED]
remove_field => ["port", "_id",]
}
mutate {
convert => {
"statusCode" => "integer"
"systemTimestamp" => "integer"
"targetResponseTimeInMillis" => "integer"
"requestProcessingTimeInMillis" => "integer"
}
}

date {
  match => [ "timestamp", "ISO8601" ]
  target => "@timestamp"
}
geoip {
source => "sourceIP"
}

}
}

output {
if "BESLPROXY" in [tags] {
if [apiproxy] == "VALUEX" or [apiproxy] == "VALUEY" or [apiproxy] == "VALUEX" or [apiproxy] == "VALUEAB" {
elasticsearch {
hosts => ["SERVER:9200"]
user => "USERNAME"
password => "PASSWORD"
index => "logstash-besl-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
}
}


logstash_AM.conf

input {
tcp {
port => 5543
type => syslog
tags => ["AppointmentManagerProxy"]
#Therotically this tags really not required, but trying with that too
}
}

filter {
if "AppointmentManagerProxy" in [tags] {
grok {
match => [REMOVED]
remove_field => ["port", "_id"]
}
mutate {
convert => {
"statusCode" => "integer"
"systemTimestamp" => "integer"
"targetResponseTimeInMillis" => "integer"
"requestProcessingTimeInMillis" => "integer"
}
}

date {
  match => [ "timestamp", "ISO8601" ]
  target => "@timestamp"
}
    geoip {
            source => "sourceIP"
    }

}
}

output {
if "AppointmentManagerProxy" in [tags] {
if [apiproxy] == "AppointmentManager" {
elasticsearch {
hosts => ["SERVER:9200"]
user => "USERNAME"
password => "PASSWORD"
index => "logstash-am-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
}
}


logstash_CIS.conf
input {
tcp {
port => 5542
type => syslog
tags => ["CISPROXY"]
#Therotically this tags really not required, but trying with that too
}
}

filter {
if "CISPROXY" in [tags] {
grok {
match => [REMOVED]
remove_field => ["port", "_id"]
}
mutate {
convert => {
"statusCode" => "integer"
"systemTimestamp" => "integer"
"targetResponseTimeInMillis" => "integer"
"requestProcessingTimeInMillis" => "integer"
"tin_last4" => "integer"
}
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
geoip {
source => "sourceIP"
}
}
}

output {
if "CISPROXY" in [tags]{
if [apiproxy] == "CIS" {
elasticsearch {
hosts => ["SERVER:9200"]
user => "USERNAME"
password => "PASSWORD"
index => "logstash-cis-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
}
}

Your writes to elasticsearch are conditional upon the value of apiproxy. Maybe that does not have the value you think it does.

Hello,

All the conditions are satisfying, meaning the vales sending from the source to Logstash are correct. Confirmed this by running the individual Logstash conf file.

Pipleline.yml file is always tricky.

even some space kills it. I had so much trouble in past with it.
I never figure out what it is. but create new file and type everything in it. do not cut-paste anything in that file.
hopefully that will fix the problem.

Hi

Maybe you have already tried it, but I'll give it a shot.

I have a single pipeline with several .yml files. Each file has its own input{} with unique tags and unique id, and everything works like a charm. I see you don't have an id.

If you have them in separate pipelines you won't need either tags or id, since they don't "see" each other, but this setup seems to add more complexity, and there might be some error in your pipeline configuration, as already pointed out by others.

I don't use the tcp input, but I figure it must work just the same with the setup I described.

Hope this helps.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.