Dynamic naming of elasticsearch data-streams

Hi,

I'm trying to have some dynamic naming for my data streams based on some syslog fields.

Here is my rsyslog conf file for sending datas in JSON format to logstash.

# cat logstash-json.conf
template(name="json-template"
        type="list"
        option.json="on") {
                constant(value="{")
                constant(value="\"@timestamp\":\"")     property(name="timereported" dateFormat="rfc3339")
                constant(value="\",\"@version\":\"1")
                constant(value="\",\"message\":\"")     property(name="msg")
                constant(value="\",\"hostname\":\"")    property(name="hostname")
                constant(value="\",\"severity\":\"")    property(name="syslogseverity-text")
                constant(value="\",\"facility\":\"")    property(name="syslogfacility-text")
                constant(value="\",\"programname\":\"") property(name="programname")
                constant(value="\",\"procid\":\"")      property(name="procid")
                constant(value="\"}\n")
}

action(type="omfwd" target="elkglbvprd1" port="10514" protocol="tcp" template="json-template")

And here is my logstash conf file:

input {
        tcp {
                port => 10514
                codec => "json_lines"
                type => "syslog"
        }
}

filter {
        mutate {
                rename => { "hostname" => "[host][name]" }
                add_field => { "[@metadata][dataset]" => "%{facility}" }
                add_field => { "[@metadata][namespace]" => "%{programname}" }
        }

        if [facility] == "authpriv" {
                grok {
                        match => { "message" => "pam_unix\(%{GREEDYDATA}\): session %{WORD:session_state} for user %{USERNAME:user}(\(uid=%{INT:uid}\) by (%{USERNAME:ruser})?\(
uid=%{INT:ruid}\))?" }

                }
        }
}

output {
#       stdout { codec => rubydebug }
        elasticsearch {
                hosts => "https://localhost:9200"
                ssl => true
                cacert => "/etc/logstash/certs/http_ca.crt"
                user => "logstash_writer"
                password => "myLogstashPassword"
                data_stream => "true"
                data_stream_type => "logs"
                data_stream_dataset => "%{ [@metadata][dataset] }"
                data_stream_namespace => "%{ [@metadata][namespace] }"
        }
}

EDIT : The error is

[ERROR] 2023-02-10 16:25:32.891 [Converge PipelineAction::Create<main>] elasticsearch - Invalid setting for elasticsearch output plugin:

  output {
    elasticsearch {
      # This setting must be a dataset_identifier
      # Invalid characters detected ["\\", "/", "*", "?", "\"", "<", ">", "|", " ", ",", "#", ":"] are not allowed
      data_stream_dataset => "%{ [@metadata][dataset] }"
      ...
    }
  }

Am I doing something wrong or is it just not possible?
Maybe someone of here can share his thoughts.

Regards

Try to remove the spaces, spaces are invalid characters.

Invalid characters detected ["\", "/", "*", "?", """, "<", ">", "|", " ", ",", "#", ":"] are not allowed
data_stream_dataset => "%{ [@metadata][dataset] }"

Try to use:

data_stream_dataset => "%{[@metadata][dataset]}"

Hi,

Unfortunatly, that doesn't work with both

data_stream_dataset => "%{[@metadata][dataset]}"

or

data_stream_dataset => "%{facility}"
[WARN ] 2023-02-10 17:14:04.698 [[main]>worker3] elasticsearch - Badly formatted index, after interpolation still contains placeholder: [logs-%{[@metadata][dataset]}-prod]; ...

[WARN ] 2023-02-10 16:36:50.734 [[main]>worker1] elasticsearch - Badly formatted index, after interpolation still contains placeholder: [logs-%{facility}-prod]; ...

Found this issue which seems to talk about it...
Use metadata for data_stream_auto_routing · Issue #13528 · elastic/logstash · GitHub

I'm not sure this is the same issue.

The error message says:

elasticsearch - Badly formatted index, after interpolation still contains placeholder: [logs-%{facility}-prod]

Do you have the field facility and @metadata.dataset in all documents?

Can you share an example of a document you have in logstash? use a stdout output to get it.

It is. The elasticsearch output does not sprintf the datastream options. If you want them to be dynamic then use auto_routing and it will take them from fields on the [data_stream] object in the event.

Since [data_stream][dataset] will be the same for every document in the same datastream, it is a constant keyword and the template can tell elasticsearch not to store it with each document, so it does not waste storage.

Using [@metadata] is definitely the "logstash way" of designing something like this, but I don't think that counts for much at Elastic :smiley:

1 Like

Thanks for the information. That was my guess...

[data_stream][dataset] and [datastream][namespace] will not be the same for every document as the "facility" field of syslog can have multiple value e.g. daemon, auth, ... same for "progname".
Here, the purpose is to regroup log by categories and programs. But space isn't the problem at the moment.

As I'm relativly new to ELK, I have to understand what solution you are describing. Could use more infos if you have some.

Regards

Agreed, but for every document in a particular datastream they will be the same, since the name of that datastream is formed from [data_stream][dataset] and [datastream][namespace]. If [data_stream][dataset] has a different value then the document will go to a different datastream.

Note that this could result in a large number of small datastreams, and having lots of small indexes is not recommended.

We have a lot of servers ready to send logs of the same type, and I can play with ILM to avoid small indexes :wink:

Could you give me an example of configuration with my datastream ?

{
         "severity" => "info",
       "@timestamp" => 2023-02-10T15:11:50.599484Z,
        "@metadata" => {
        "namespace" => "sudo",
            "input" => {
            "tcp" => {
                "source" => {
                    "name" => "172.18.0.123",
                      "ip" => "172.18.0.123",
                    "port" => 40536
                }
            }
        },
          "dataset" => "authpriv"
    },
           "procid" => "-",
             "host" => {
        "name" => "lotrvprd1"
    },
      "programname" => "sudo",
         "@version" => "1",
             "type" => "syslog",
          "message" => " pam_unix(sudo:session): session closed for user root",
    "session_state" => "closed",
         "facility" => "authpriv",
             "user" => "root"
}
{
       "severity" => "info",
     "@timestamp" => 2023-02-10T15:11:51.623747Z,
      "@metadata" => {
        "namespace" => "systemd",
            "input" => {
            "tcp" => {
                "source" => {
                    "name" => "172.18.0.123",
                      "ip" => "172.18.0.123",
                    "port" => 40536
                }
            }
        },
          "dataset" => "daemon"
    },
    "programname" => "systemd",
         "procid" => "1",
           "host" => {
        "name" => "lotrvprd1"
    },
       "@version" => "1",
           "type" => "syslog",
        "message" => " Starting SSSD Sudo Service responder...",
       "facility" => "daemon"
}

Thanks for your help

I am suggesting something like

mutate {
    add_field => { 
        "[data_stream][dataset]" => "%{facility}"
        "[data_stream][namespace]" => "%{programname}"
        "[data_stream][type]" => "logs"
   }
}

Hi Badger !

Nice ! It works like a charm :wink:

Here is the final configuration file:

input {
        tcp {
                port => 10514
                codec => "json_lines"
                type => "syslog"
        }
}

filter {
        mutate {
                rename => { "hostname" => "[host][name]" }
                add_field => {
                        "[data_stream][type]" => "logs"
                        "[data_stream][dataset]" => "%{facility}"
                        "[data_stream][namespace]" => "%{programname}"
                }
        }
}

output {
#       stdout { codec => rubydebug { metadata => true } }

        elasticsearch {
                hosts => "https://localhost:9200"
                ssl => true
                cacert => "/etc/logstash/certs/http_ca.crt"
                user => "logstash_writer"
                password => "myLogstashPassword"
                data_stream => "true"
        }
}

Thanks for your help.

Have a nice day.

Regards.

Hi,

Unfortunatly, I still have a problem with some Windows facilities exported by NXLog, as some name are using special characters like "/" (e.g.: Windows/DNS) which cannot be use for datastreams datasets names.

I try to replace those "/" with a "_", but cannot make it work.

Here are my test conf files:

input {
        file {
                type => json
                path => "/etc/logstash/conf.d/test.log"
                start_position => "beginning"
                sincedb_path => "/dev/null"
        }
}


filter {
        # Create a correct datastream for elasticsearch
        mutate {
                rename => { "hostname" => "[host][name]" }
                rename => { "Hostname" => "[host][name]" }
        }

        mutate {
                add_field => { "[@metadata][facility]" => "%{facility}" }
                gsub => [ "[@metadata][facility]", "/", "_" ]
        }

        mutate {
                add_field => {
                        "[data_stream][type]" => "logs"
                        "[data_stream][dataset]" => "%{[@metadata][facility]}"
                }
        }
}

output {
        stdout { codec => rubydebug { metadata => true } }
}

And the test file...

{ "@timestamp\":"2023-02-23", "@version":"1", "destination":"all", "message":"MyMessage", "hostname":"myHostame", "severity":"mySeverity", "facility":"Machine/Windows", "programname":"MyProgram", "procid":"000" }

The result is:

{
     "@timestamp" => 2023-02-28T14:23:38.527856572Z,
            "log" => {
        "file" => {
            "path" => "/etc/logstash/conf.d/test.log"
        }
    },
    "data_stream" => {
           "type" => "logs",
        "dataset" => "%{facility}"
    },
      "@metadata" => {
        "facility" => "%{facility}",
            "host" => "elkglbvprd1",
            "path" => "/etc/logstash/conf.d/test.log"
    },
       "@version" => "1",
           "host" => {
        "name" => "elkglbvprd1"
    },
           "type" => "json",
        "message" => "{ \"@timestamp\\\":\"2023-02-23\", \"@version\":\"1\", \"destination\":\"all\", \"message\":\"MyMessage\", \"hostname\":\"myHostame\", \"severity\":\"mySeverity\", \"facility\":\"Machine/Windows\", \"programname\":\"MyProgram\", \"procid\":\"000\" }",
          "event" => {
        "original" => "{ \"@timestamp\\\":\"2023-02-23\", \"@version\":\"1\", \"destination\":\"all\", \"message\":\"MyMessage\", \"hostname\":\"myHostame\", \"severity\":\"mySeverity\", \"facility\":\"Machine/Windows\", \"programname\":\"MyProgram\", \"procid\":\"000\" }"
    }
}

What am I missing here ?

You are not parsing your json message, so you won't have a field named facility.

You need to add a json filter to parse your message.

Having the literal %{facility} as the field value indicates that the field does not exist in your document.

Also, this won't work:


        mutate {
                add_field => { "[@metadata][facility]" => "%{facility}" }
                gsub => [ "[@metadata][facility]", "/", "_" ]
        }

The gsub would be executed before the add_field, you need to break this in two mutates.

mutate {
        gsub => [ "facility", "/", "_" ]
}
mutate {
        add_field => { "[@metadata][facility]" => "%{facility}" }
}

Here is the correct filter:

filter {
        # Create a correct datastream for elasticsearch
        mutate {
                rename => { "hostname" => "[host][name]" }
                rename => { "Hostname" => "[host][name]" }
        }

        mutate {
                gsub => [ "facility", "/", "-" ]
                gsub => [ "facility", " ", "_" ]
        }

        mutate {
                add_field => {
                        "[data_stream][type]" => "logs"
                        "[data_stream][dataset]" => "%{facility}"
                }
        }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.