Logstash Enrich and translate plugin use

Hello Team,

I am trying to enrich the data before it makes its way too elastic, I have tried the below methods but both are currently not working

  1. Using the elasticsearch plugin in filter
input {
    kafka {
            bootstrap_servers => "xxx.xx.xx.xxx:9092"
            topics => ["topicname"]
           }
        }
filter {
     elasticsearch {
      hosts => ["xxx.xx.xx.xxx:9200"]
      index => "lookup"
      query => '{ "query": { "match": { "id": "%{id}" } } }'
      fields => {
        "desc" => "desc"
      }
      user => elastic
      password => "******"
      ca_file => "/path/to/elasticsearch-ca.pem"
     }
    json {
            source => "message"
            remove_field => ["message", "@version"]
    }
    mutate{
            lowercase => ["name"]
    }
}
output {
   elasticsearch {
        hosts => ["xxx.xx.xx.xxx:9200"]
        index => "test"
        document_id => "%{id}"
        user => elastic
        password => *******
        ssl => true
        cacert => '/path/to/elasticsearch-ca.pem'
        ssl_certificate_verification => false
   }
}

The issue that I ran into is that ssl_certificate_verification is not allowed as a parameter on the Elasticsearch filter hence it is not connecting to Elasticsearch. I tried generating the cert using the following link so that I can have a secure SSL connection but that also doesn't work

2)After which I tried the translate filter to achieve the same, I get no error but the lookup using the dictionary is not happening

input {
    kafka {
            bootstrap_servers => "xxx.xx.xx.xxx:9092"
            topics => ["topicname"]
           }
        }
filter {
    translate {
      source => ["code"]
      target => ["desc"]
      dictionary => {
        "1" => "One"
        "2" => "two"
        "3" => "three"
        "4" => "four"
     }
     override => true
     fallback => "Fallback"
    }
    json {
            source => "message"
            remove_field => ["message", "@version"]
    }
    mutate{
            lowercase => ["name"]
    }
}
output {
   stdout { codec => rubydebug }
   elasticsearch {
        hosts => ["xxx.xx.xx.xxx:9200"]
        index => "test"
        document_id => "%{id}"
        user => elastic
        password => "*********"
        ssl => true
        cacert => '/path/to/elasticsearch-ca.pem'
        ssl_certificate_verification => false
   }
}

The value of id was INTEGER but the dictionary source field should be a string so I even converted and tried no error but the desc field is blank.

Finally I also setup the enrich index and configuration on elastic but when I send data through Logstash I get DLQ issues.

Regards ,
Guru

1 Like

I am still not able to get it working. I was able to successfully setup the elastic enrich pipeline, index template, and add records from the Dev Tools PUT cmd but the through logstash is not working.

Any update on this will be helpful to debug the issue with Enrich pipeline.

Can you provide more context? It is no clear what you are trying to do now since you mentioned translate filter and elasticsearch filter in Logstash and Enrich pipeline in Elasticsearch which are two completely different things.

I would say that if you are using Logstash the easiest way to enrich data is using the translate filter.

You didn't share any sample message of how your data looks like, nor any log error you are getting, so it is not possible to know why the translate filter didn't worked.

Also, in the logstash configuration you shared you had your translate filter before your json filter, it needs to come after you parse your message or the source field will not exist and the translate filter will not work.

Hello @leandrojmp ,

Thanks a lot for pointing that out I was able to get the translate working and instead of using dictionary I have used the dictionary path variable but I noticed that every time we update the values on the CSV file we need to restart logstash is that right ?

Also I am using multiple translate in the filter for performing multiple lookups.

input {
    kafka {
            bootstrap_servers => "xxx.xx.xx.xxx:9092"
            topics => ["topicname"]
           }
        }
filter {
  json {
            source => "message"
            remove_field => ["message", "@version"]
    }
    mutate{
            lowercase => ["name"]
    }
    translate {
      source => ["code"]
      target => ["desc"]
      dictionary => {
        "1" => "One"
        "2" => "two"
        "3" => "three"
        "4" => "four"
     }
     override => true
     fallback => "Fallback"
    }
translate {
      source => ["code2"]
      target => ["desc2"]
      dictionary => {
        "1" => "Black"
        "2" => "Green"
        "3" => "Yellow"
        "4" => "Red"
     }
     override => true
     fallback => "Fallback"
    }
}
output {
   stdout { codec => rubydebug }
   elasticsearch {
        hosts => ["xxx.xx.xx.xxx:9200"]
        index => "test"
        document_id => "%{id}"
        user => elastic
        password => "*********"
        ssl => true
        cacert => '/path/to/elasticsearch-ca.pem'
        ssl_certificate_verification => false
   }
}

Regards,
Guru

No, the dictionary is refreshed in memory, per default logstash will check the dictionary for changes and refresh it every 5 minutes, but you can decrease the refresh interval.

Thanks @leandrojmp It does sync after 5 min.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.