Logstash Kafka input - converting date to string format

Hello,

We are using Kafka plugin to get feed into Logstash.

One of the fields that come with the message is in date format.

I need to convert that date into string format.

Please guide. My config file looks as follows:

input {
    kafka {
      topics => ["acg-cvr.fs_watch_events"]
      group_id => "JSWGRP"
      tags => ["JSwLogs"]
      bootstrap_servers => "broker:9093"
      security_protocol => "SSL"
      ssl_truststore_type => "JKS"
      ssl_truststore_location => "/opt/gtal/cacerts.jks"
      ssl_truststore_password => "xxxxx"
      codec => "json"
      decorate_events => true
  }
  
    kafka {
      topics => ["data.kafkametrics"]
      group_id => "KAFKINGEST"
      tags => ["KafkIngest"]
      bootstrap_servers => "broker:9093"
      security_protocol => "SSL"
      ssl_truststore_type => "JKS"
      ssl_truststore_location => "/opt/gtal/cacerts.jks"
      ssl_truststore_password => "xxxxx"
      codec => "json"
      decorate_events => true
  }
    kafka {
      topics => ["data.datacollectmetrics"]
      group_id => "KAFKCOLLECT"
      tags => ["KafkCollect"]
      bootstrap_servers => "broker:9093"
      security_protocol => "SSL"
      ssl_truststore_type => "JKS"
      ssl_truststore_location => "/opt/gtal/cacerts.jks"
      ssl_truststore_password => "xxxxx"
      codec => "json"
      decorate_events => true
  }
         mutate {
                strip => ["rdt"]
                }

        date {
                match => ["rdt", "yyyy-MM-dd'+'HH:mm:ss.SSS", "MMM dd HH:mm:ss", "ISO8601" ]
                }

                mutate { convert => ["rdt","string"] }

output {

    if "JSwLogs" in [tags] {

    elasticsearch {
     hosts => [ "xx-xx-xxx.xx:43045" ]
     hosts => [ "xx-xx-xxx.xx:43045" ]
     hosts => [ "xx-xx-xxx.xx:43045" ]
     hosts => [ "xx-xx-xxx.xx:43045" ]
     user => "xxxxx"
     password => "xxxxx"
     index => "js_events-%{+YYYY.MM.dd}"
     manage_template => true
     template_overwrite => true
     template => "/opt/gtal/config/js_template.json"
     template_name => "js_watcher"
        }
}

     if "KafkIngest" in [tags] {

    elasticsearch {
     hosts => [ "xx-xx-xxx.xx:43045" ]
     hosts => [ "xx-xx-xxx.xx:43045" ]
     hosts => [ "xx-xx-xxx.xx:43045" ]
     hosts => [ "xx-xx-xxx.xx:43045" ]
     user => "xxxxx"
     password => "xxxxx"
     index => "data.kafkaingest-%{+YYYY.MM.dd}"
     manage_template => true
     template_overwrite => true
     template => "/opt/gtal/config/ingest_template.json"
     template_name => "dataING_template"
                }
        }


    if "KafkCollect" in [tags] {

     elasticsearch {
     hosts => [ "xx-xx-xxx.xx:43045" ]
     hosts => [ "xx-xx-xxx.xx:43045" ]
     hosts => [ "xx-xx-xxx.xx:43045" ]
     hosts => [ "xx-xx-xxx.xx:43045" ]
     user => "xxxxx"
     password => "xxxxx"
     index => "datacollect-%{+YYYY.MM.dd}"
     manage_template => true
     template_overwrite => true
     template => "/opt/gtal/config/collect_template.json"
     template_name => "dataCollect_template"
                }
        }

The field rdt comes in date format as part of message from data.datacollectmetrics topic and I need to convert the format to string

Please guide

Convert where? In Logstash or Elasticsearch ? The name of the field is adt or rdt?

Your logstash configuration does not reference any adt field.

Also, can you share a sample of your document.

Thanks @leandrojmp

I corrected the post with rdt being the field. This needs to be converted to string in Logstash and persisted in elasticsearch.

I will post the document shortly

If you need this field to be a string field in Elasticsearch you need to map it as a keyword field before indexing it.

If you do not map it and elasticsearch recognize it as a date field, it will be mapped as a date field, it doesn't matter if you converted it to string in Logstash or not, what matters is the Elasticsearch mapping.

Thanks. Can you please guide me to a document that describes on how that is done?

You can start here about mapping, you will probably need to create an index template for your index if you don't have it, if you are already have, is it seems you do, you just need to create or change the mapping.

Thanks @leandrojmp

My template looks like below:

{ "template" : "dataCollect_template",
  "version" : 1,  
  "index_patterns": "tdata.kafkacollect-*",
  "settings": {
    "index": {
      "refresh_interval": "10s",
      "number_of_shards": 1,
      "number_of_replicas": 0
    }
  },
  "mappings": {
      "properties": {
        "rdt": { "type": "string" }
      }
    }
}

I see the following 400 response error during start of Logstash:


[2023-03-24T12:09:36,310][INFO ][logstash.outputs.elasticsearch][main] Installing elasticsearch template to _template/dataCollect_template
[2023-03-24T12:09:36,394][ERROR][logstash.outputs.elasticsearch][main] Failed to install template. {:message=>"Got response code '400' contacting Elasticsearch at URL 'http://xx-xxx-xx-xx:9045/_template/dataCollect_template'", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError",
 :backtrace=>["/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb:80:in `perform_request'",
 "/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:332:in `perform_request_to_url'", 
"/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:319:in `block in perform_request'",
 "/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:414:in `with_connection'", 
"/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:318:in `perform_request'", 
"/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:326:in `block in Pool'",
 "/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/http_client.rb:352:in `template_put'", 
"/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/http_client.rb:86:in `template_install'", 
"/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/template_manager.rb:28:in `install'", 
"/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/template_manager.rb:16:in `install_template'",
 "/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/common.rb:197:in `install_template'", 
"/opt/tvportal/elasticsearch/app/logstash-7.6.2/vendor/bundle/jruby/2.5.0/gems/logstash-output-elasticsearch-10.3.3-java/lib/logstash/outputs/elasticsearch/common.rb:53:in `block in setup_after_successful_connection'"]}

Please guide. We are using version 7.6.2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.