Error in sending data from kafka to elasticsearch via logstash

Hi All,

I am trying to send metricbeat data to beats input via graylog. Data is coming to streams and storing it in Elasticsearch index (metricbeat_0).

From the streams I am trying to send data to kafka via Manage Outputs in graylog and we have created customized kafka output plugin and below is the kafka config in manage outputs,

Graylog-Kafka ID: 5d247036c4566734032f9382
Type: org.graylog.plugins.kafka.KafkaOutput
  
TOPIC: metricbeat
ack: all
batch_size: 16384
broker_server: localhost:9092
buffer_memory: 33554432
linger_ms: 1
retries: 0

When I checked in the kafka topic metricbeat I can see the data,

ganeshbabur@localhost:/usr/local/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic metricbeat
{“windows_perfmon_processor_handle_count”:379,“gl2_remote_ip”:“157.49.222.215”,"@metadata_version":“6.3.2”,“gl2_remote_port”:59537,“beat_hostname”:“localhost”,“source”:“localhost”,“message”:"-",“beats_type”:“metricbeat”,“gl2_source_input”:“5d19bafac132874d3215704b”,“metricset_module”:“windows”,"@metadata_beat":“metricbeat”,“metricset_name”:“perfmon”,“windows_perfmon_processor_handleCount”:“dwm”,“beat_name”:“localhost”,"@timestamp":“2019-07-09T18:25:47.062Z”,"@metadata_type":“doc”,“metricset_rtt”:21866,“beat_version”:“6.3.2”,“gl2_source_node”:“e543c0e3-b76c-4a7c-8e10-e8427d96dcf8”,"_id":“f9016693-a276-11e9-ac0b-0a580af40001”,“host_name”:“localhost”,“timestamp”:2019-07-09T18:25:47.062Z}
{“gl2_remote_ip”:“157.49.222.215”,"@metadata_version":“6.3.2”,“gl2_remote_port”:59537,“beat_hostname”:“localhost”,“source”:“localhost”,“message”:"-",“beats_type”:“metricbeat”,“gl2_source_input”:“5d19bafac132874d3215704b”,“metricset_module”:“windows”,“windows_perfmon_processor_name”:“svchost”,"@metadata_beat":“metricbeat”,“metricset_name”:“perfmon”,“beat_name”:“localhost”,"@timestamp":“2019-07-09T18:25:47.062Z”,"@metadata_type":“doc”,“metricset_rtt”:23852,“windows_perfmon_processor_workingset_bytes”:2.3740416E7,“beat_version”:“6.3.2”,“gl2_source_node”:“e543c0e3-b76c-4a7c-8e10-e8427d96dcf8”,"_id":“f9025102-a276-11e9-ac0b-0a580af40001”,“host_name”:“localhost”,“timestamp”:2019-07-09T18:25:47.062Z}

Now I am using logstash to write data from the kafka topic to Elasticsearch index,
Below is the config I tried,

input{
  kafka {
     bootstrap_servers => "localhost:9092"
     topics => ["metricbeat"]
     auto_commit_interval_ms => "100"
     auto_offset_reset => "latest"
     codec => json
     decorate_events => true
    }
  }

output{
  elasticsearch{
     hosts => ["localhost:9200"]
     manage_template => false
     index => "testbeat"
    }
  stdout { codec => rubydebug }
}

below is the error i am getting,

[2019-07-09T18:37:38,596][ERROR][logstash.codecs.json] JSON parse error, original data now in message field {:error=>#<LogStash::Json::ParserError: Unexpected character ('-' (code 45)): was expecting comma to separate Object entries
at [Source: (String)" {"windows_perfmon_processor_handle_count":366,"gl2_remote_ip":"203.90.4.250","@metadata_version":"6.3.2","gl2_remote_port":61473, "beat_hostname":"localhost","source":"localhost","message":"-","beats_type":"metricbeat", "gl2_source_input":"5d19bafac132874d3215704b","metricset_module":"windows","@metadata_beat":"metricbeat","windows_perfmon_processor_handleCount":"slack","metricset_name":"perfmon","beat_name":"localhost","@timestamp":"2019-07-09T13:15:58.389Z","@metadata_type":"doc","metri"[truncated 205 chars]; line: 1, column: 686]>, :data=>"{"windows_perfmon_processor_handle_count":366,"gl2_remote_ip":"203.90.4.250","@metadata_version":"6.3.2","gl2_remote_port":61473,"beat_hostname":"localhost","source":"localhost","message":"-","beats_type":"metricbeat","gl2_source_input":"5d19bafac132874d3215704b","metricset_module":"windows","@metadata_beat":"metricbeat","windows_perfmon_processor_handleCount":"slack","metricset_name":"perfmon","beat_name":"localhost","@timestamp":"2019-07-09T13:15:58.389Z","@metadata_type":"doc","metricset_rtt":11981,"beat_version":"6.3.2","gl2_source_node":"e543c0e3-b76c-4a7c-8e10-e8427d96dcf8","_id":"b167d65c-a24b-11e9-ac0b-0a580af40001","host_name":"localhost","timestamp":2019-07-09T13:15:58.389Z}"}

I can see in the data “message”: field has character “-” why does the logstash getting failed when the value has “-” and in elasticsearch testbeat index has the below document which is having jsonparsefailure,

{
        "_index": "testbeat",
        "_type": "doc",
        "_id": "ed4L2GsBLabQ6slboePR",
        "_score": 1,
        "_source": {
          "@timestamp": "2019-07-09T18:42:48.491Z",
          "message": """{"system_memory_actual_used_bytes":6538346496,"system_memory_actual_used_pct":0.7732,"system_memory_swap_total":24561954816,"gl2_remote_ip":"203.90.4.250","@metadata_version":"6.3.2","gl2_remote_port":61473,"system_memory_total":8455827456,"source":"localhost","beats_type":"metricbeat","gl2_source_input":"5d19bafac132874d3215704b","metricset_module":"system","@metadata_beat":"metricbeat","metricset_name":"memory","beat_name":"localhost","@metadata_type":"doc","system_memory_used_bytes":6538346496,"beat_version":"6.3.2","system_memory_used_pct":0.7732,"gl2_source_node":"e543c0e3-b76c-4a7c-8e10-e8427d96dcf8","system_memory_free":1917480960,"system_memory_swap_free":14430650368,"timestamp":2019-07-09T13:15:58.389Z,"beat_hostname":"localhost","message":"-","@timestamp":"2019-07-09T13:15:58.389Z","system_memory_swap_used_pct":0.4125,"system_memory_actual_free":1917480960,"_id":"b1682470-a24b-11e9-ac0b-0a580af40001","system_memory_swap_used_bytes":10131304448,"host_name":"localhost"}""",
          "tags": [
            "_jsonparsefailure"
          ],
          "@version": "1"
        }
      }

Please correct me if I am doing anything and let me know your thoughts to resolve this issue.

Thanks,
Ganeshbabu R

That is not valid JSON, so a json filter will log exactly that error when it tries to parse it.

Yes timestamp field is not a proper JSON and timestamp is generated by graylog when it comes to stream.

OK, so you cannot use a codec. You will need to use mutate+gsub to add quotes to the unquoted timestamp, then use a json filter.

Hi @Badger

I tried using mutate in logstash filter and below is my logstash config,

As you suggested me to add quotes to the unquoted timestamp and i couldn't find the right syntax but instead I tired removing the timestamp from the message and below is the mutate gsub i tried and I tested the changes with sample data and timestamp field is removed from the message and data was indexed successfully

input{
  kafka {
     bootstrap_servers => "localhost:9092"
     topics => ["testbeat"]
     auto_commit_interval_ms => "100"
     auto_offset_reset => "latest"
     decorate_events => true
    }
}
filter {
  json{
    source => "message"
  }
  mutate {
  gsub => ["message", "\"timestamp[^<]+\,",""]
}
}
output{
  elasticsearch{
    hosts => ["elasticsearch:9200"]
    manage_template => false
    index => "testbeat"
    }
}

Below is the raw message i am getting from kafka topic "testbeat" and messages were getting failed in logstash due to JSON Parse error

[ERROR] 2019-07-16 17:54:01.418 [Ruby-0-Thread-31: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.1.1/lib/logstash/
inputs/kafka.rb:242] json - JSON parse error, original data now in message field 
{:error=>#<LogStash::Json::ParserError: Unexpected character ('-' (code 45)): was expecting comma to separate Object entries at 
[Source: (String)"{"collector_node_id":"DEMO","gl2_remote_ip":"localhost","@metadata_version":"6.8.1","gl2_remote_port":34816,
"source":"demo-server","beats_type":"metricbeat","gl2_source_input":"5d1f3b484d72b4d7729","metricset_module":"windows",
"@metadata_beat":"metricbeat","beat_name":"demo-server","metricset_name":"perfmon","@metadata_type":"doc","metricset_rtt":264691,
"beat_version":"6.8.1","gl2_source_node":"9055a442-306c6-e6d390f68817",
"windows_perfmon_iis_appservice_get_requests_value"[truncated 362 chars]; line: 1, column: 522]>, 
:data=>"{\"collector_node_id\":\"DEMO\",\"gl2_remote_ip\":\"localhost\",\"@metadata_version\":\"6.8.1\",\"gl2_remote_port\":34816,
\"source\":\"demo-server\",\"beats_type\":\"metricbeat\",\"gl2_source_input\":\"5d1f3b484d72104d7729\",\"metricset_module\":\"windows\",
\"@metadata_beat\":\"metricbeat\",\"beat_name\":\"demo-server\",\"metricset_name\":\"perfmon\",\"@metadata_type\":\"doc\",\"metricset_rtt\":264691,
\"beat_version\":\"6.8.1\",\"gl2_source_node\":\"9055a442-306c-45456d390f68817\",\"windows_perfmon_iis_appservice_get_requests_value\":0,
\"timestamp\":2019-07-16T17:53:27.560Z,\"windows_perfmon_iis_appservice_get_request\":\"V2WR_IN_0_Dev\",\"gl2_source_collector\":\"DEMO\",
\"event_dataset\":\"windows.perfmon\",\"beat_hostname\":\"demo-server\",\"event_duration\":264691900,\"message\":\"-\",\"@timestamp\":\"2019-07-16T17:53:27.560Z\",
\"_id\":\"b16c9bc3-a7f2-11bc436feb8\",\"host_name\":\"demo-server\"}"}

And in the elasticsearch index json parse failure tags were indexed and below is the sample doc,

{
       "_index" : "testbeat",
       "_type" : "doc",
       "_id" : "uH8G_GsB481WS89W8gqz",
       "_score" : 1.0,
       "_source" : {
         "tags" : [
           "_jsonparsefailure"
         ],
         "@version" : "1",
		 "message" : "{\"collector_node_id\":\"DEMO\",\"gl2_remote_ip\":\"10.244.0.1\",\"@metadata_version\":\"6.8.1\",\"gl2_remote_port\":34816,
\"source\":\"demo-server\",\"beats_type\":\"metricbeat\",\"gl2_source_input\":\"5d1f3b484d72b04d7729\",\"metricset_module\":\"windows\",
\"@metadata_beat\":\"metricbeat\",\"metricset_name\":\"perfmon\",\"beat_name\":\"demo-server\",\"@metadata_type\":\"doc\",
\"metricset_rtt\":259380,\"beat_version\":\"6.8.1\",\"gl2_source_node\":\"b14f563b-81f5-eb-8b7126db7c45\",\"host_name\":\"demo-server\"}",
		 "@timestamp" : "2019-07-16T18:24:01.495Z"
		 }
	}

Kindly share you thoughts and help us to resolve this error.

Regards,
Ganeshbabu R

Move the mutate so that it is before the json filter.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.