Logstash sending all data to elasticsearch via wrong pipeline

Hello everyone.

I'm having an issue with pipelines, I think I caused the problem but can't find where it is. I have two servers, one running ELK and Filebeat, the other running Apache and Filebeat.

What I am trying to do is to get all the logs from /var/log/messages to one pipeline, and the ones from /varr/log/httpd/access_log to another one. My goal is to have two seperate indexes.
But unfortunately, all the logs are still going through the first pipeline and not the second one.

here are the config files:

pipe1.conf

#######
#INPUT#
#######

input {
  beats {
    #port d'ecoute de logstash
    port => 5044
    #pas d'authentification par certificat
    ssl => false
  }
  file {
    path => "/var/log/messages"

  }
}

########
#FILTER#
########

filter {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{WORD:process}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"  }
    }

    date {
      match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
}

########
#OUTPUT#
########

output {
 elasticsearch {
  hosts => localhost
  index => "indextest"
       }
stdout {
    codec => rubydebug
       }
}

pipe2.conf

#######
#INPUT#
#######

input {
  file {
    path => "/var/log/httpd/access_log"
  }
  beats {
   #port d'ecoute de logstash
   port => 5044
   #pas d'authentification par certificat
   ssl => false
   }
}

########
#FILTER#
########

filter {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    date {
      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
}



########
#OUTPUT#
########

output {
 elasticsearch {
  hosts => "localhost"
  index => "apache"
       }
stdout {
    codec => rubydebug
       }
}

I looked at the logstash logs and everything is running fine.
It's surely a onditional problem.

@Wellguys, if you have just separated your Logstash configuration into two different files, you aren't running in two separate pipelines. You have to configure that in your pipelines.yml file. If not, all your Logstash *.conf files are concatenated together and run in a single pipeline. That's almost certainly why you're seeing logs in the other ES index.

See here for an example.

That seems unlikely. You have two beats inputs listening on port 5044. One of them should be getting "Error: Address already in use".

You might want to run with "--config.test_and_exit --log.level debug" to verify it is picking up the configuration that you expect.

My pipelines.yml is like this:

- pipeline.id: pipe1
  path.config: "/etc/logstash/conf.d/pipe1.conf"

- pipeline.id: pipe2
  path.config: "/etc/logstash/conf.d/pipe2.conf"

I restarted the service and it is true that my second pipeline is not running well. I cant't find the index in kibana either.

[2019-01-28][ERROR][logstash.pipeline] A plugin had an unrecoverable error. Will restart this plugin.
  Pipeline_id:pipe2

I'm using centos7 and I don't really know how to use "--config.test_and_exit --log.level debug". I enabled it in logstash.yml and restarted logstash but nothing seems to happen. I'm not getting any logs either.

I restarted the service normally again and this time pipe1 is having a problem. Meanwhile, pipe2 is taking all the logs.
Is it not possible to send all data via one port, and then seperate them depending on log source file ?

You can not have two input plugins listening to the same port, which is why one of the pipelines always fails. You could however put the beats input plugin in a third separate pipeline and then use conditionals to direct the data to your other pipelines using pipeline-to-pipeline communication.

1 Like

Oh yes thats perfect, it works great with a third pipeline. I'll send all my configuration for people who would have the same problem. Thanks for the help everyone!

pipeline.yml

- pipeline.id: analyzer
  path.config: "/etc/logstash/conf.d/analyzer.conf"

- pipeline.id: pipe1
  path.config: "/etc/logstash/conf.d/pipe1.conf"
- pipeline.id: pipe2
  path.config: "/etc/logstash/conf.d/pipe2.conf"

analyser.conf

#INPUT#
input {
  beats {
    port => 5044
    ssl => false
  }
}

#OUTPUT#
output {
  if [source] == "/var/log/messages" {
    pipeline { send_to => pipe1 }
  }
  else if [source] == "/var/log/httpd/access_log" {
    pipeline { send_to => pipe2 }
  }
}

pipe2.conf

#INPUT#
input {
  pipeline {
    address => pipe2
  }
}

#FILTER#
filter {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    date {
      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
}

#OUTPUT#
output {
 elasticsearch {
  hosts => "localhost"
  index => "apache"
       }
stdout {
    codec => rubydebug
       }
}

pipe1.conf

#INPUT#
input {
  pipeline {
    address => pipe1
  }
}

#FILTER#
filter {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{WORD:process}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"  }
    }

    date {
      match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
}

#OUTPUT#
output {
 elasticsearch {
  hosts => localhost
  index => "indextest"
       }
stdout {
    codec => rubydebug
       }
}

Hello, I used your configuration for the pipeline. Though i have two different metricbeats in two different servers. So in the analyzer.conf I have 2 beats input listening to different port. I tried to send output to different pipelines based on port but it doesn't work. Is it even possible to use
if [port] == "5045" { do something} . I am aiming to create different indeces based on which server the logs come from. I also added tags to each beat input to send to the pipeline based on tag but still doesn't work. i can't see the new indeces in kibana.

If you want to create different indexes depending on the server you should use something like if [beathostname]. From what I understand, things in "[word]" are fields in logs so It won't work with port number. If you want to use two different ports you only need two pipelines, with a different input port in both.

It doesn't work with the hostname. But I think the problem is that both servers write to the same registry. I will fix that and see if indexing is working.

It works now . Each server writes to different registry and I can see the different indeces in kibana. Thanks Wellguys.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.