Badly formatted index

Hi the team,

I have a logstash pipeline which worked well sometimes for 2 or 3 weeks, but this time it worked for 2 days, and this morning it return this error :

Badly formatted index, after interpolation still contains placeholder: [idx_uat_zed_v1_p0340_s1158_%{[@metadata][input][type]}_custom]

I can not figure out where is the problem located. is it in my pipeline, is it in the message parsed by my pipeline... I wonder if it is not during the index creation....
I'm open to any idea...

Thank you all !

Hi @Khaled_Saidi,

Can you share which version of Logstash you are using, and share your pipeline configuration? Looking at this similar issue it might be down to a syntax error.

Let us know!

Hi Carly,
I saw the issue you are talking about, and i did not notice any syntax error.
We are running a v8.x logstash (not sure about the x).
Here is the pipeline :

input {
    kafka {
        bootstrap_servers => "XXXXXXXXXXXXXXXXX"
        decorate_events => true
        group_id => "alise.catas.zed-uat.group.alise"
        topics_pattern => "alise\.catas\.zed-uat\.logs\.v1-s1158.*"
        codec => "json"
        ssl_endpoint_identification_algorithm => ""
        ssl_truststore_location => "/usr/share/logstash/config/ssl/truststore.jks"
        ssl_truststore_password => "TrustPass%%"
        ssl_truststore_type => "jks"
        security_protocol => "SASL_SSL"
        sasl_mechanism => "PLAIN"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required  username='${XXXXXXXXX_USERNAME}' password='${ XXXXXXXXX_PASSWORD}';"
    }
}
filter {
    if [message] =~ /^time=/ {
        kv {
            source => "message"
            field_split => "|"
            value_split => "="
        }

        date {
            match => [ "time", "ISO8601" ]
            #timezone => "Europe/Paris"
timezone => "+01:00"
            target => "@timestamp"
        }

        mutate {remove_field => [ "message" ]}

        if [log_message] =~ "type=" {

            grok {
                match => { "log_message" => "type=%{WORD:[metric][kind]}, name=%{WORD:[spark][app_id]}.%{WORD:[spark][instance]}.%{NOTSPACE:metric_name}, %{GREEDYDATA:metric_data}" }
                add_field => { "[@metadata][input][type]" => "metrics" }
            }

            mutate {
                remove_field => [ "log_message", "level", "location", "thread" ]
            }

            split {
                field => "metric_data"
                terminator => ","
            }

            grok {
                match => { "metric_data" => "%{WORD:metric_agg}=%{NUMBER:[metric][value]}" }
                add_field => { "[metric][name]" => "%{metric_name}.%{metric_agg}" }
            }

            if [metric][name] =~ "DAGScheduler"
                or [metric][name] =~ "CodeGenerator"
                or [metric][name] =~ "HiveExternalCatalog"
                or [metric][name] =~ "executor.filesystem" {
                drop {}
            } else {
                mutate {
                    convert => { "[metric][value]" => "float" }
                    remove_field => [ "metric_data", "metric_agg", "metric_name" ]
                }

                grok {
                    match => { "[metric][name]" => "hbase.%{WORD:[hbase][table]}.%{WORD:[hbase][operation]}(.%{WORD:[hbase][family]})?.%{WORD:[metric][name]}.value" }
                    overwrite => [ "[metric][name]" ]
                }

                grok {
                    match => { "[metric][name]" => "spark.streaming.%{WORD:flux}.%{WORD:processus}.%{WORD:[metric][name]}.value" }
                    overwrite => [ "[metric][name]" ]
                }

                grok {
                    match => { "[metric][name]" => "%{WORD:flux}.%{WORD:processus}.%{TOPIC:[kafka][topic]}.%{WORD:[metric][name]}.%{WORD}" }
                    overwrite => [ "[metric][name]" ]
                    pattern_definitions => { "TOPIC" => " zed\.catas\.s[0-9]{4}-uat\.[^.]*\.v1-topic.*" }
                }
            }
        } else {
                mutate {
                    add_field => { "[@metadata][input][type]" => "messages" }
                }
        }
    }
}

output {
    elasticsearch {
        hosts => "XXXXXXXXXXXXXXXXX"
        user => "XXXXXXXXXXXXXXXXX"
        password => "XXXXXXXXXXXXXXXXX"
        ssl => true
        ssl_certificate_verification => false
        cacert => "/usr/share/logstash/config/ssl/ca.crt"
        action => "create"
        index => "idx_uat_zed_v1_p0340_s1158_%{[@metadata][input][type]}_custom"
    }
}

You are sending events to the output that do not have a [@metadata][input][type] field. Your filter section will only add that if [message] =~ /^time=/, so I guess you just started receiving events for which that is not true.

Hi Badger,

You are right.
To test, I add the field with a new value in an else section.
For the moment, the error is not return... I'm gonna let it run few days and i will close this topic.
Thx for your help.

BR,
Khaled