Failed action with response of 400, dropping action (Logstash to Kafka to Logstash to E and Mongo) (SOLVED)

Hi All,

String JSON messages cannot parsed successfully by logstash, following is my logstash configuration (similar to http://stackoverflow.com/questions/30542879/how-to-use-logstashs-json-filter)

input {
        kafka {
                topic_id => "neptunus"
                zk_connect => "zookeeper4kafka:2181"
                type => "json"
        }
}

output {
        stdout { codec => rubydebug }
        elasticsearch {
                host => "140.92.25.59"
                codec => "json"
                protocol => "http"
                index => "logstash-%{+YYYY.MM.dd}"
        }

        mongodb {
                uri => "mongodb://mongo4kafka"
                database => kafkatest
                collection => hellomongo
                codec => "json"
        }

        mongodb {
                uri => "mongodb://mongo264kafka"
                database => kafkatest
                collection => hellomongo
                codec => "json"
        }
}

When the JSON value of keys doesn't include String, it always works fine, but if there is a String value of any key of JSON message, it will occur some error like following:

{
       "message" => "{\"address\":\"55555\",\"b\":553,\"timestamp\":1425094186000}",
      "@version" => "1",
    "@timestamp" => "2015-10-21T05:48:19.660Z",
          "host" => "logstashpubkafka",
          "path" => "/var/log/kafka/test",
          "type" => "json",
       "address" => "55555",
             "b" => 553,
     "timestamp" => 1425094186000
}
{
       "message" => "{\"address\":\"abcde\",\"b\":553,\"timestamp\":1425094186000}",
      "@version" => "1",
    "@timestamp" => "2015-10-21T05:48:35.672Z",
          "host" => "logstashpubkafka",
          "path" => "/var/log/kafka/test",
          "type" => "json",
       "address" => "abcde",
             "b" => 553,
     "timestamp" => 1425094186000
}
failed action with response of 400, dropping action: ["index", {:_id=>nil, :_index=>"logstash-2015.10.21", :_type=>"json", :_routing=>nil}, #<LogStash::Event:0x4820eb7d @metadata_accessors=#<LogStash::Util::Accessors:0x6859c7bb @store={"retry_count"=>0}, @lut={}>, @cancelled=false, @data={"message"=>"{\"address\":\"abcde\",\"b\":553,\"timestamp\":1425094186000}", "@version"=>"1", "@timestamp"=>"2015-10-21T05:48:35.672Z", "host"=>"logstashpubkafka", "path"=>"/var/log/kafka/test", "type"=>"json", "address"=>"abcde", "b"=>553, "timestamp"=>1425094186000}, @metadata={"retry_count"=>0}, @accessors=#<LogStash::Util::Accessors:0x2997ee7e @store={"message"=>"{\"address\":\"abcde\",\"b\":553,\"timestamp\":1425094186000}", "@version"=>"1", "@timestamp"=>"2015-10-21T05:48:35.672Z", "host"=>"logstashpubkafka", "path"=>"/var/log/kafka/test", "type"=>"json", "address"=>"abcde", "b"=>553, "timestamp"=>1425094186000}, @lut={"type"=>[{"message"=>"{\"address\":\"abcde\",\"b\":553,\"timestamp\":1425094186000}", "@version"=>"1", "@timestamp"=>"2015-10-21T05:48:35.672Z", "host"=>"logstashpubkafka", "path"=>"/var/log/kafka/test", "type"=>"json", "address"=>"abcde", "b"=>553, "timestamp"=>1425094186000}, "type"]}>>] {:level=>:warn}

Anyone can help

Is there anything interesting in the ES log that could give clues about why the document was rejected?

Hi Magnus,

Found the debug log

[2015-10-21 13:48:30,065][DEBUG][action.bulk              ] [datalake_es] [logstash-2015.10.21][2] failed to execute bulk item (index) index {[logstash-2015.10.21][json][AVCI8QvwKg1WubLfTgfH], source[{"message":"{\"address\":\"abcde\",\"b\":553,\"timestamp\":1425094186000}","@version":"1","@timestamp":"2015-10-21T05:48:35.672Z","host":"logstashpubkafka","path":"/var/log/kafka/test","type":"json","address":"abcde","b":553,"timestamp":1425094186000}]}
org.elasticsearch.index.mapper.MapperParsingException: failed to parse [address]
        at org.elasticsearch.index.mapper.core.AbstractFieldMapper.parse(AbstractFieldMapper.java:411)
        at org.elasticsearch.index.mapper.object.ObjectMapper.serializeValue(ObjectMapper.java:706)
        at org.elasticsearch.index.mapper.object.ObjectMapper.parse(ObjectMapper.java:497)
        at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:544)
        at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:493)
        at org.elasticsearch.index.shard.IndexShard.prepareCreate(IndexShard.java:466)
        at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:418)
        at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:148)
        at org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase.performOnPrimary(TransportShardReplicationOperationAction.java:574)
        at org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase$1.doRun(TransportShardReplicationOperationAction.java:440)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NumberFormatException: For input string: "abcde"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:589)
        at java.lang.Long.parseLong(Long.java:631)
        at org.elasticsearch.common.xcontent.support.AbstractXContentParser.longValue(AbstractXContentParser.java:145)
        at org.elasticsearch.index.mapper.core.LongFieldMapper.innerParseCreateField(LongFieldMapper.java:288)
        at org.elasticsearch.index.mapper.core.NumberFieldMapper.parseCreateField(NumberFieldMapper.java:239)
        at org.elasticsearch.index.mapper.core.AbstractFieldMapper.parse(AbstractFieldMapper.java:401)
        ... 13 more

Why cannot use String in JSON?

Your address field has previously been mapped as an integer because the first time that field was set it had an integer value. Integer fields can't be assigned string values. Changing the mapping of an existing field requires reindexing.

Hi Magnus,

I see, thanks

Jason