Send data to right pipeline from two CSV files

Hello,

I am trying to understand workflow Filebeat ---> Beats/Logstash ---> Right pipeline.

For example, we can have two CSVfiles with the same structure. Both data stream is sending to the Beats who listen on port 5044. My question is how Logstash knows to witch pipeline he should forward the data. Sorry if I miss something in documentation.

BR,
Mladen

I’ll try to be more specific.

After adding second source (/iib/syslogmqsi/iib.log) in filebeat.yml I get an error in my Logstash log. This is part of my filebeat.yml:

> - type: log
> 
>   # Change to true to enable this prospector configuration.
>   enabled: true
> 
>   # Paths that should be crawled and fetched. Glob based paths.
>   paths:
>     - /logs/monitoring/EG_monitoring_kaiibraz.log
>     - /iib/syslogmqsi/iib.log

I have two pipelines, one is CSV and another use grok filter.

This is my csv pipeline:

input {
    beats {
        port => "5044"
    }
}
# The filter part of this file is commented out to indicate that it is
# optional.
filter {
    csv {
        columns => [ "date_time", "cpu_utilization", "ram_utilization", "execution_group" ]
        separator => ","
    }
    mutate {convert => ["cpu_utilization", "float"] }
    mutate {convert => ["ram_utilization", "float"] }
    date {
        locale => "en"
        match => ["date_time", "dd-MM-yy;HH:mm:ss"]
        timezone => "Europe/Vienna"
        target => "@timestamp"
    }
}
output {
    elasticsearch {
        hosts => [ "localhost:9200" ]
        index => "kaiibraz"
    }
}

And grok-filter:

input {
    beats {
        port => "5044"
    }
}
filter {
    grok {
    match => { "message" => "^%{SYSLOGTIMESTAMP:DATE_TIME} %{HOSTNAME:HOSTNAME} %{WORD:SYSTEM}\[%{BASE10NUM:PID}]: IBM Integration Bus %{WORD} \(%{WORD:NODE}.%{WORD:EG}\) \[%{WORD} %{BASE10NUM}] \(%{WORD} %{NOTSPACE}\) %{WORD:CODE}: %{GREEDYDATA:MESSAGE}$" }
    }
    date {
        locale => "en"
        match => ["DATE_TIME", "MMM dd HH:mm:ss"]
        timezone => "Europe/Belgrade"
        target => "@timestamp"
    }
}
output {
    if "_grokparsefailure" in [tags] {
        # write events that didn't match to a file
        file { "path" => "/grok/kaiibraz/grok_log_filter_failures_kaiibraz.txt" }
    }
    else {
        elasticsearch {
        hosts => [ "localhost:9200" ]
        index => "kaiibraz_log"
        }
    }
}

I release that something is wrong when I find data from CSV file in /grok/kaiibraz/grok_log_filter_failures_kaiibraz.txt :slight_smile:

BR,
Mladen

Unless you are using pipelines.yml the configuration files are concatenated, events from each input is sent through every filter and written to every output. You could do something like this, with a different magicvalue in each file.

input {
  beats {
    port => "5044"
    add_field => { "[@metadata][somefield]" => "magicvalue" }
  }
}
filter {
  if "[@metadata][somefield]" == "magicvalue" {
    ...
  }
}
output {
  if "[@metadata][somefield]" == "magicvalue" {
    ...
  }
}
1 Like

Thank you @Badger for point me to the right direction. I didn't have a clue what is happening in background :slight_smile:. From this post I released that every data stream will be processed with every filter in all pipelines. If we want to control the flow we need to use conditionals. But what is happening if we have for example 50 servers (50 filebeat-s agent) :slight_smile:? What is the best practice in this situation?

Reading articles I found that problem with multiple log files can solve using multiple instances of filebeat. Post is form here. Could someone explain to me how this solves the problem?

Every prospector in a filebeat instance writes data to the same output. You can add a field to each prospector to identify the type of data (for example, I have a filebeat that collects J9GClog, G1GClog, and apacheaccess). Then in a single pipeline you can have filters that conditionally process each type of log.

If you use multiple instances of filebeat, then each one can write to a different output. So, for example, I could send J9GClog to localhost:5044, G1GClog to localhost:5045, and apacheaccess to localhost:5046. Then I would run three pipelines, each with a beat input on a different port, each processing a single type of log with no conditionals.

Another option is to use a single beat, then use an extra pipeline to do the routing. This might be a big if-else if-else if-else if to route to (for example) tcp inputs on different ports. Or use a translate filter to do the mapping. There are many other ways to dress it up, but the if-else is always there.

2 Likes

Thanks a lot, for your comment and examples. I now have a better understanding of what is happening during the whole process :slight_smile:.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.