Using a Single Logstash template.conf Across Multiple Pipelines with Dynamic Variable Replacement

I need to create 1000 pipelines that will all use the same template file, template.conf, as a template.

For example, in the pipelines.yml file I have these pipelines:

- pipeline.id: pip_01
  path.config: "/etc/logstash/conf.d/template.conf"
  var.logfile: "prod/cs/file01.log"
  var.index_name: "file01"

- pipeline.id: pip_02
  path.config: "/etc/logstash/conf.d/template.conf"
  var.logfile: "prod/cs/file02.log"
  var.index_name: "file02"

The fields var.logfile and var.index_name are custom and will replace the placeholders ${logfile} and ${index_name} in the template.conf file.

Here is the template.conf file:

input {
    google_cloud_storage {
        bucket_id      => "my-bucket"
        interval       => 5
        json_key_file  => "/etc/gcloud/credencial.json"
        file_matches   => "${logfile}"   # <<< value of var.logfile here
    }
}

filter {
    # Use grok to process log entries
    grok {
        match => {
            "message" => "\[%{LOGLEVEL:severity}\s*\] %{DATESTAMP:timestamp} %{DATA:origin} - %{GREEDYDATA:msg}"
        }
        pattern_definitions => {
            "LOGLEVEL" => "INFO|WARN|ERROR"
        }
    }
}

output {
    elasticsearch {
        hosts        => ["https://my_es_host:443"]
        user         => "user"
        password     => "pwd"
        index        => "${index_name}"   # <<< value of var.index_name here
        ssl_enabled  => true
    }
}

Is this possible?
Can I use one template.conf for all the pipelines and substitute only the values I need, so that I can reuse the content of template.conf without duplicating it for every pipeline?

Hello and welcome,

No, this is not possible with Logstash and you cannot reuse configurations, you need to duplicate.

But you do not need to create thousands of files, you may use something like ansible to create them for you and even create their entries in pipelines.yml.

Also, 1000 pipelines in a single logstash instance may not be ideal and may have performance issues.

Can you provide more context of your use case? Do you really need thousands of indices, one per file? This is not good practice.

Thank you for the feedback.

Each input file is a log file. They have the same format, and the naming convention is [username].log.
Each user has a log file.
I want to isolate each user into a separate index in Elasticsearch.

For example:
user01.logindex_user01
user02.logindex_user02
user03.logindex_user03
and so on...

The .log files are in GCP Cloud Storage and are updated in real time.
Logstash monitors new entries in each file and sends them to Elasticsearch.

What would be the best strategy?

  • A single pipeline for all 1000 files?
  • But how can I differentiate the indices in the output?
  • The only information I have to differentiate them is in the filename, not in the log content.

I tried the following approach, but it took too long to start collecting and sending data.

Input configuration:

file_matches => "prod/cs/*.log"

In the filter, extract the filename and remove the .log extension to use it as the index name:

grok {
    match => { "path" => ".*\/(?<index_name>[^\/]+)\.log$" }
}

In the output, set the dynamic index name based on the filename:

index => "%{index_name}"

Does this work?

  • Is this a good strategy?
  • Is there a better approach?

Thanks.

What is the content of these logs files? They all have the same structure in the events?

If so, you could have just one index and add the filename as a field in the event using logstash.

You have the path of the file, so you can extract the information of the user from it and add a new field like user_name with this information, this will allow you to filter the events by the user.

The logstash part works, but it is not a good strategy because you would still have one index per user, this will not scale it can lead to a lot of shards in your cluster and a lot of small indices.

As mentioned, if the structure in the events is the same, you may use just one index and add the information of the user into the event before indexing the data.

Hello Leandro!

Yes, the content of the files follows the same structure in the events. What changes is the file name, which is the user's name.

I'll use this strategy of creating an index and adding the user_name field.

I have doubts about which option is best.

Should I create a pipeline to process all 1000 files? Or one pipeline for each file?

If I create a single .conf file for all input files, how should the file_matches configuration look?

For one file, it worked like this: file_matches => "prod/cs/file01.log"

But for all the files, should I use this? file_matches => "prod/cs/.*log"

This is the configuration I am using. I tested it with single file file01.log and it worked. But I would like to get all the .log files.

input {
    google_cloud_storage {
        bucket_id => "my-bucket"
        interval => 5
        json_key_file => "/etc/gcloud/credencial.json"
        file_matches => "prod/cs/file01.log"
        codec => multiline {
            pattern => "^\[[A-Z]{4,5}\s*\]"
            negate => true
            what => "previous"
        }
    }
}

filter {
    mutate {
        # Adding the full file path as user_name
        add_field => { "user_name" => "%{[@metadata][gcs][name]}" }
    }

    # Use mutate to extract only the file name
    mutate {
        gsub => ["user_name", "^.*?/([^/]+)\.log$", "\1"]
    }
    
    # Use grok to process log entries
    grok {
        match => {
            "message" => "\[%{LOGLEVEL:severity}\s*\] %{DATESTAMP:timestamp} %{DATA:origin} - %{GREEDYDATA:msg}"
        }
        pattern_definitions => {
            "LOGLEVEL" => "INFO|WARN|ERROR"
        }
    }

    # Convert the timestamp to the desired format
     date {
        match => [ "timestamp", "dd/MM/yyyy HH:mm:ss.SSS" ]
        target => "@timestamp"
        remove_field => [ "timestamp" ]
    }

    mutate {
        remove_field => ["message", "host", "@version", "event", "tags", "path"]
    }
}

output {
    # Send to Elasticsearch
    elasticsearch {
        hosts => ["https://myhost.us-west1.gcp.cloud.es.io:443"]
        user => "logstash_writer"
        password => "123456"
        index => "index_logs"
        ssl_enabled => true
    }

    # Display output to the terminal for debugging
    stdout { codec => rubydebug }
}

Thanks

According to the documetnation, yes, you can use wildcards on the file_matches option.

One thing that you mentioned is this:

Being honest I'm not sure this will work with this plugin as the files are in object storage and the plugin is used to download complete files, not files that are still being written.

If you are updating the files in the object storage and expect logstash to get the new lines, I don't think this work.

Ok, thank you, Leandro.
I’ll test it.
If there are any updates, I'll open another discussion topic.
Thanks!