JSON parse error, original data now in message field - Unexpected character (',' (code 44))

I am trying to deploy this pipeline on Kibana/Logstash pipelines:

input { 
  rabbitmq {
  host => "xxxxxxxxxx"
  vhost => "/"
  queue => "outrights"
  port => 5672
  heartbeat => 30
  durable => true
  password => "xxxxxxxx!"
  user => "xxxxxxxx"   
}
}

filter {
  dissect {
mapping => { "message" => "%{permission},%{sourceID},%{type},%{sessionIndicator},%{ccyPair},%{priceType},%{tenor},%{bidPrice},%{askPrice},%{source},%{region},%{city},%{regionID},%{cityID},%{currentPrice},%{tradeOpen},%{tradeHigh},%{tradeLow},%{trend},%{direction},%{change},%{percentChange},%{tradePrice},%{yesterdayTradeClose},%{tradeDateTime},%{quoteDateTime},%{sentDateTime},%{fractionalIndicator},%{previousTradeDate},%{desktopEligibilityIndicator},%{midPrice}" }
  }
  mutate 
     { 
remove_field => ["permission","desktopEligibilityIndicator","message","regionID","cityID","tags","quoteDateTime","tradeDateTime","trend","direction","sessionIndicator","yesterdayTradeClose","tradeHigh","tradeLow","tradeOpen","tradePrice","currentPrice","previousTradeDate","change","percentChange","type" ]
  }
}  

output {
  
elasticsearch {
  hosts => [ "127.0.0.1:9200" ] 
  user => "elastic"
  password => "elastic"
  index => "outrights"
  template_overwrite => "true"
  manage_template => "false"
  }

  stdout{codec=> json}
}

This is the ERROR:

[2018-07-03T14:53:19,209][ERROR][logstash.codecs.json     ] JSON parse error, original data now in message field {:error=>#<LogStash::Json::ParserError: Unexpected character (',' (code 44)): Expected space separating root-level values
 at [Source: (String)"1321,1321,X,,USDBRL,O,12M,3.97110,3.97620,HSBC,NAM,NYC,160115,163103,,,,,-+-++-++,-,-0.01080,-0.27000,3.97110,,2018-06-28T19:58:28.548Z,2018-06-28T19:58:28.548Z,2018-06-28T19:58:28.548Z,,,1,3.97365,280=4"; line: 1, column: 6]>, :data=>"1321,1321,X,,USDBRL,O,12M,3.97110,3.97620,HSBC,NAM,NYC,160115,163103,,,,,-+-++-++,-,-0.01080,-0.27000,3.97110,,2018-06-28T19:58:28.548Z,2018-06-28T19:58:28.548Z,2018-06-28T19:58:28.548Z,,,1,3.97365,280=4"}

This is my logstash yml:

# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html


xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.url: http://127.0.0.1:9200
xpack.monitoring.elasticsearch.username: elastic
xpack.monitoring.elasticsearch.password: elastic
xpack.monitoring.collection.interval: 10s
xpack.monitoring.collection.pipeline.details.enabled: true
xpack.monitoring.elasticsearch.sniffing: false


#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
xpack.management.enabled: true
xpack.management.elasticsearch.url: http://127.0.0.1:9200/
xpack.management.elasticsearch.username: elastic
xpack.management.elasticsearch.password: elastic
xpack.management.logstash.poll_interval: 5s
xpack.management.pipeline.id: outrights

Please can you help?

The default codec for a rabbitmq plugin is json, so it is trying to parse the messages from the queue as JSON. And they are not JSON, they are csv. So change the codec on the input to, umm, try plain or line.

Thanks @Badger for the help ... this is the response to :

input { 
  rabbitmq {
      host => "1XX.XX.X52.43"
      vhost => "/"
      queue => "outrights"
      port => 5672
      heartbeat => 30
      durable => true
      password => "xxxxxxxxx!"
      user => "xxxxx" 
    }
stdin { codec => plain }
} 

is

       [2018-07-03T18:42:43,413][ERROR][logstash.codecs.json     ] JSON parse error, original data now in message field {:error=>#<LogStash::Json::ParserError: Unexpected character (',' (code 44)): Expected space separating root-level values
     at [Source: (String)"1321,1321,X,,EURINR,O,02W,79.87550,79.99070,OTCV,GLO,NYC,71417,163103,,,,,-*++++--,-,-0.02000,-0.03000,79.87550,,2018-06-28T21:05:06.967Z,2018-06-28T21:05:06.967Z,2018-06-28T21:05:06.967Z,,,1,79.93310,280=4"; line: 1, column: 6]>, :data=>"1321,1321,X,,EURINR,O,02W,79.87550,79.99070,OTCV,GLO,NYC,71417,163103,,,,,-*++++--,-,-0.02000,-0.03000,79.87550,,2018-06-28T21:05:06.967Z,2018-06-28T21:05:06.967Z,2018-06-28T21:05:06.967Z,,,1,79.93310,280=4"}

and this is the response to "line"

    [2018-07-03T18:48:16,060][ERROR][logstash.codecs.json     ] JSON parse error, original data now in message field {:error=>#<LogStash::Json::ParserError: Unexpected character (',' (code 44)): Expected space separating root-level values
 at [Source: (String)"1321,1321,X,,INRHKD,O,11M,0.10840,0.10880,OTCV,GLO,NYC,71417,163103,,,,,-*******,-,0.00000,0.00000,0.10840,,2018-06-28T21:10:13.281Z,2018-06-28T21:10:13.281Z,2018-06-28T21:10:13.281Z,,,1,0.10860,280=4"; line: 1, column: 6]>, :data=>"1321,1321,X,,INRHKD,O,11M,0.10840,0.10880,OTCV,GLO,NYC,71417,163103,,,,,-*******,-,0.00000,0.00000,0.10840,,2018-06-28T21:10:13.281Z,2018-06-28T21:10:13.281Z,2018-06-28T21:10:13.281Z,,,1,0.10860,280=4"}

Do you have another ideas, any very welcome?

No, the codec has to be specified on the rabbitmq input, not another input.

input {
    rabbitmq {
        codec => plain
        host => "1XX.XX.X52.43"
        [...]
    }
}
1 Like

Tried

input { 
  rabbitmq {
      codec => plain
      host => "xxxxxxxxx"
      vhost => "/"
      queue => "outrights"
      port => 5672
      heartbeat => 30
      durable => true
      password => "xxxxxxxxxxx!"
      user => "xxxx" 
    }
}

and got

JBB:logstash-6.3.0 jonathanbowker$ bin/logstash
Sending Logstash's logs to /Users/jonathanbowker/Servers/ELK-2/logstash-6.3.0/logs which is now configured via log4j2.properties
[2018-07-03T20:26:45,016][INFO ][logstash.configmanagement.bootstrapcheck] Using Elasticsearch as config store {:pipeline_id=>["outrights"], :poll_interval=>"5000000000ns"}
[2018-07-03T20:26:46,142][INFO ][logstash.licensechecker.licensereader] Elasticsearch pool URLs updated {:changes=>{:removed=>, :added=>[http://elastic:xxxxxx@127.0.0.1:9200/]}}
[2018-07-03T20:26:46,170][INFO ][logstash.licensechecker.licensereader] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://elastic:xxxxxx@127.0.0.1:9200/, :path=>"/"}
[2018-07-03T20:26:46,552][WARN ][logstash.licensechecker.licensereader] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@127.0.0.1:9200/"}
[2018-07-03T20:26:46,630][INFO ][logstash.licensechecker.licensereader] ES Output version determined {:es_version=>6}
[2018-07-03T20:26:46,636][WARN ][logstash.licensechecker.licensereader] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>6}
[2018-07-03T20:26:46,954][INFO ][logstash.configmanagement.elasticsearchsource] Configuration Management License OK
[2018-07-03T20:26:48,248][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.3.0"}
[2018-07-03T20:26:48,403][INFO ][logstash.configmanagement.elasticsearchsource] Elasticsearch pool URLs updated {:changes=>{:removed=>, :added=>[http://elastic:xxxxxx@127.0.0.1:9200/]}}
[2018-07-03T20:26:48,406][INFO ][logstash.configmanagement.elasticsearchsource] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://elastic:xxxxxx@127.0.0.1:9200/, :path=>"/"}
[2018-07-03T20:26:48,418][WARN ][logstash.configmanagement.elasticsearchsource] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@127.0.0.1:9200/"}
[2018-07-03T20:26:48,425][INFO ][logstash.configmanagement.elasticsearchsource] ES Output version determined {:es_version=>6}
[2018-07-03T20:26:48,426][WARN ][logstash.configmanagement.elasticsearchsource] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>6}
[2018-07-03T20:26:52,018][WARN ][logstash.outputs.elasticsearch] You are using a deprecated config setting "document_type" set in elasticsearch. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature If you have any questions about this, please visit the logstash channel on freenode irc. {:name=>"document_type", :plugin=><LogStash::Outputs::Elasticsearch hosts=>[http://127.0.0.1:9200], bulk_path=>"/_xpack/monitoring/_bulk?system_id=logstash&system_api_version=2&interval=1s", manage_template=>false, document_type=>"%{[@metadata][document_type]}", sniffing=>false, user=>"elastic", password=>, id=>"de99f30988270b50b83b747a519e135f6ca04c3d180322ff51ab3fe067f52fe1", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_fa2021e4-2b6e-4b29-a639-659754f4207c", enable_metric=>true, charset=>"UTF-8">, workers=>1, template_name=>"logstash", template_overwrite=>false, doc_as_upsert=>false, script_type=>"inline", script_lang=>"painless", script_var_name=>"event", scripted_upsert=>false, retry_initial_interval=>2, retry_max_interval=>64, retry_on_conflict=>1, action=>"index", ssl_certificate_verification=>true, sniffing_delay=>5, timeout=>60, pool_max=>1000, pool_max_per_route=>100, resurrect_delay=>5, validate_after_inactivity=>10000, http_compression=>false>}
[2018-07-03T20:26:52,195][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>".monitoring-logstash", "pipeline.workers"=>1, "pipeline.batch.size"=>2, "pipeline.batch.delay"=>50}
[2018-07-03T20:26:52,360][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>, :added=>[http://elastic:xxxxxx@127.0.0.1:9200/]}}
[2018-07-03T20:26:52,364][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://elastic:xxxxxx@127.0.0.1:9200/, :path=>"/"}
[2018-07-03T20:26:52,435][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@127.0.0.1:9200/"}
[2018-07-03T20:26:52,455][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-07-03T20:26:52,457][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>6}
[2018-07-03T20:26:52,513][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::Elasticsearch", :hosts=>["http://127.0.0.1:9200"]}
[2018-07-03T20:26:52,699][INFO ][logstash.licensechecker.licensereader] Elasticsearch pool URLs updated {:changes=>{:removed=>, :added=>[http://elastic:xxxxxx@127.0.0.1:9200/]}}
[2018-07-03T20:26:52,701][INFO ][logstash.licensechecker.licensereader] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://elastic:xxxxxx@127.0.0.1:9200/, :path=>"/"}
[2018-07-03T20:26:52,708][WARN ][logstash.licensechecker.licensereader] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@127.0.0.1:9200/"}
[2018-07-03T20:26:52,719][INFO ][logstash.licensechecker.licensereader] ES Output version determined {:es_version=>6}
[2018-07-03T20:26:52,720][WARN ][logstash.licensechecker.licensereader] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>6}
[2018-07-03T20:26:52,863][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>".monitoring-logstash", :thread=>"#<Thread:0x246d1ec9@/Users/jonathanbowker/Servers/ELK-2/logstash-6.3.0/logstash-core/lib/logstash/pipeline_action/create.rb:48 run>"}
[2018-07-03T20:27:02,977][INFO ][logstash.inputs.metrics ] Monitoring License OK
[2018-07-03T20:27:03,079][ERROR][logstash.inputs.metrics ] Failed to create monitoring event {:message=>"For path: http_address. Map keys: [:stats, :jvm]", :error=>"LogStash::Instrument::MetricStore::MetricNotFound"}

Thank you @badger resolved

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.