Failed action with response of 400, dropping action (Logstash to Kafka to Logstash to E and Mongo) (SOLVED)


(Jason Zheng) #1

Hi All,

String JSON messages cannot parsed successfully by logstash, following is my logstash configuration (similar to http://stackoverflow.com/questions/30542879/how-to-use-logstashs-json-filter)

input {
        kafka {
                topic_id => "neptunus"
                zk_connect => "zookeeper4kafka:2181"
                type => "json"
        }
}

output {
        stdout { codec => rubydebug }
        elasticsearch {
                host => "140.92.25.59"
                codec => "json"
                protocol => "http"
                index => "logstash-%{+YYYY.MM.dd}"
        }

        mongodb {
                uri => "mongodb://mongo4kafka"
                database => kafkatest
                collection => hellomongo
                codec => "json"
        }

        mongodb {
                uri => "mongodb://mongo264kafka"
                database => kafkatest
                collection => hellomongo
                codec => "json"
        }
}

When the JSON value of keys doesn't include String, it always works fine, but if there is a String value of any key of JSON message, it will occur some error like following:

{
       "message" => "{\"address\":\"55555\",\"b\":553,\"timestamp\":1425094186000}",
      "@version" => "1",
    "@timestamp" => "2015-10-21T05:48:19.660Z",
          "host" => "logstashpubkafka",
          "path" => "/var/log/kafka/test",
          "type" => "json",
       "address" => "55555",
             "b" => 553,
     "timestamp" => 1425094186000
}
{
       "message" => "{\"address\":\"abcde\",\"b\":553,\"timestamp\":1425094186000}",
      "@version" => "1",
    "@timestamp" => "2015-10-21T05:48:35.672Z",
          "host" => "logstashpubkafka",
          "path" => "/var/log/kafka/test",
          "type" => "json",
       "address" => "abcde",
             "b" => 553,
     "timestamp" => 1425094186000
}
failed action with response of 400, dropping action: ["index", {:_id=>nil, :_index=>"logstash-2015.10.21", :_type=>"json", :_routing=>nil}, #<LogStash::Event:0x4820eb7d @metadata_accessors=#<LogStash::Util::Accessors:0x6859c7bb @store={"retry_count"=>0}, @lut={}>, @cancelled=false, @data={"message"=>"{\"address\":\"abcde\",\"b\":553,\"timestamp\":1425094186000}", "@version"=>"1", "@timestamp"=>"2015-10-21T05:48:35.672Z", "host"=>"logstashpubkafka", "path"=>"/var/log/kafka/test", "type"=>"json", "address"=>"abcde", "b"=>553, "timestamp"=>1425094186000}, @metadata={"retry_count"=>0}, @accessors=#<LogStash::Util::Accessors:0x2997ee7e @store={"message"=>"{\"address\":\"abcde\",\"b\":553,\"timestamp\":1425094186000}", "@version"=>"1", "@timestamp"=>"2015-10-21T05:48:35.672Z", "host"=>"logstashpubkafka", "path"=>"/var/log/kafka/test", "type"=>"json", "address"=>"abcde", "b"=>553, "timestamp"=>1425094186000}, @lut={"type"=>[{"message"=>"{\"address\":\"abcde\",\"b\":553,\"timestamp\":1425094186000}", "@version"=>"1", "@timestamp"=>"2015-10-21T05:48:35.672Z", "host"=>"logstashpubkafka", "path"=>"/var/log/kafka/test", "type"=>"json", "address"=>"abcde", "b"=>553, "timestamp"=>1425094186000}, "type"]}>>] {:level=>:warn}

Anyone can help


How to watch source info in elasticsearch log? What does %marker stand for in log4j2.properties and can do config it?
(Magnus Bäck) #2

Is there anything interesting in the ES log that could give clues about why the document was rejected?


(Jason Zheng) #3

Hi Magnus,

Found the debug log

[2015-10-21 13:48:30,065][DEBUG][action.bulk              ] [datalake_es] [logstash-2015.10.21][2] failed to execute bulk item (index) index {[logstash-2015.10.21][json][AVCI8QvwKg1WubLfTgfH], source[{"message":"{\"address\":\"abcde\",\"b\":553,\"timestamp\":1425094186000}","@version":"1","@timestamp":"2015-10-21T05:48:35.672Z","host":"logstashpubkafka","path":"/var/log/kafka/test","type":"json","address":"abcde","b":553,"timestamp":1425094186000}]}
org.elasticsearch.index.mapper.MapperParsingException: failed to parse [address]
        at org.elasticsearch.index.mapper.core.AbstractFieldMapper.parse(AbstractFieldMapper.java:411)
        at org.elasticsearch.index.mapper.object.ObjectMapper.serializeValue(ObjectMapper.java:706)
        at org.elasticsearch.index.mapper.object.ObjectMapper.parse(ObjectMapper.java:497)
        at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:544)
        at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:493)
        at org.elasticsearch.index.shard.IndexShard.prepareCreate(IndexShard.java:466)
        at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:418)
        at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:148)
        at org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase.performOnPrimary(TransportShardReplicationOperationAction.java:574)
        at org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$PrimaryPhase$1.doRun(TransportShardReplicationOperationAction.java:440)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NumberFormatException: For input string: "abcde"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:589)
        at java.lang.Long.parseLong(Long.java:631)
        at org.elasticsearch.common.xcontent.support.AbstractXContentParser.longValue(AbstractXContentParser.java:145)
        at org.elasticsearch.index.mapper.core.LongFieldMapper.innerParseCreateField(LongFieldMapper.java:288)
        at org.elasticsearch.index.mapper.core.NumberFieldMapper.parseCreateField(NumberFieldMapper.java:239)
        at org.elasticsearch.index.mapper.core.AbstractFieldMapper.parse(AbstractFieldMapper.java:401)
        ... 13 more

(Jason Zheng) #4

Why cannot use String in JSON?


(Magnus Bäck) #5

Your address field has previously been mapped as an integer because the first time that field was set it had an integer value. Integer fields can't be assigned string values. Changing the mapping of an existing field requires reindexing.


(Jason Zheng) #6

Hi Magnus,

I see, thanks

Jason


(system) #7