S3 input and Elasticsearch output in Logstash

Hi all,

I am trying to get familiar with S3 plugin in Logstash in two steps :
1 - Pushing logs to S3 as output
2 - Getting logs from S3 as input

1 - The Logstash conf file looks like this :

output {
        s3{
                access_key_id => "***"
                secret_access_key => "******"
                endpoint_region => "**-***-*"
                bucket => "bucket-name"
                size_file => 2048
                codec => "json"
                canned_acl => "private"
        }
}

The logs I am sending look like this :

{"_id":"55fee3c726045db681acd139","index":0,"guid":"199836ff-bbfd-4a2f-9e66-452e72342e24","isActive":false,"balance":"$3,565.23","picture":"http://placehold.it/32x32","age":38,"eyeColor":"green","name":{"first":"Brandie","last":"Blackwell"}}
 ...

Which seems to look pretty fine as I have logs stored in S3 in different files with 2K size :

{"_id":"55fee3c726045db681acd139","index":0,"guid":"199836ff-bbfd-4a2f-9e66-452e72342e24","isActive":false,"balance":"$3,565.23","picture":"http://placehold.it/32x32","age":38,"eyeColor":"green","name":{"first":"Brandie","last":"Blackwell"},"@version":"1","@timestamp":"2015-09-29T12:20:12.679Z","host":"*.*.*.*"}
...

2 - However when getting logs as input an try to send them to Elasticsearch issues came across :

The config file :

input {
        s3{
                access_key_id => "****"
                secret_access_key => "*******"
                region => "**-***-*"
                bucket => "bucket-name"
                codec => "json"
        }
}
output {
        elasticsearch {
                host => "*.*.*.*"
                protocol => "http"
        }
}

Losgtash is not able to retrieve logs from S3 and send them correctly to Elasticsearch :

failed action with response of 400, dropping action: ["index", {:_id=>nil, :_index=>"logstash-2015.09.29", :_type=>"logs", :_routing=>nil}, #<LogStash::Event:0x324fbb79 @metadata_accessors=#<LogStash::Util::Accessors:0x6f0fb4c6 @store={"retry_count"=>0}, @lut={}>, @cancelled=false, @data={"_id"=>"55fee3c7643281f83e914515", "index"=>70, "guid"=>"891dfae1-c48d-483a-a065-f61f37ed41a3", "isActive"=>true, "balance"=>"$3,263.89", "picture"=>"http://placehold.it/32x32", "age"=>26, "eyeColor"=>"green", "name"=>{"first"=>"Parrish", "last"=>"Baxter"}, "@version"=>"1", "@timestamp"=>"2015-09-29T12:59:31.543Z", "host"=>"..."}, @metadata={"retry_count"=>0}, @accessors=#<LogStash::Util::Accessors:0x4c9b374a @store={"_id"=>"55fee3c7643281f83e914515", "index"=>70, "guid"=>"891dfae1-c48d-483a-a065-f61f37ed41a3", "isActive"=>true, "balance"=>"$3,263.89", "picture"=>"http://placehold.it/32x32", "age"=>26, "eyeColor"=>"green", "name"=>{"first"=>"Parrish", "last"=>"Baxter"}, "@version"=>"1", "@timestamp"=>"2015-09-29T12:59:31.543Z", "host"=>"..."}, @lut={"message"=>[{"_id"=>"55fee3c7643281f83e914515", "index"=>70, "guid"=>"891dfae1-c48d-483a-a065-f61f37ed41a3", "isActive"=>true, "balance"=>"$3,263.89", "picture"=>"http://placehold.it/32x32", "age"=>26, "eyeColor"=>"green", "name"=>{"first"=>"Parrish", "last"=>"Baxter"}, "@version"=>"1", "@timestamp"=>"2015-09-29T12:59:31.543Z", "host"=>"..."}, "message"], "type"=>[{"_id"=>"55fee3c7643281f83e914515", "index"=>70, "guid"=>"891dfae1-c48d-483a-a065-f61f37ed41a3", "isActive"=>true, "balance"=>"$3,263.89", "picture"=>"http://placehold.it/32x32", "age"=>26, "eyeColor"=>"green", "name"=>{"first"=>"Parrish", "last"=>"Baxter"}, "@version"=>"1", "@timestamp"=>"2015-09-29T12:59:31.543Z", "host"=>"..."}, "type"]}>>] {:level=>:warn}

Could you please help me find out what I am missing here?

Check your ES logs, a 400 response should report something in those.

The Elasticsearch logs say that it failed to execute bulk item (index) index and failed to parse [_id] :

[2015-09-29 14:11:55,883][INFO ][node ] [Commander Kraken] version[1.4.4], pid[30103], build[c88f77f/2015-02-19T13:05:36Z]
[2015-09-29 14:11:55,884][INFO ][node ] [Commander Kraken] initializing ...
[2015-09-29 14:11:55,944][INFO ][plugins ] [Commander Kraken] loaded [cloud-aws], sites [bigdesk, head]
[2015-09-29 14:12:00,568][INFO ][node ] [Commander Kraken] initialized
[2015-09-29 14:12:00,568][INFO ][node ] [Commander Kraken] starting ...
[2015-09-29 14:12:00,659][INFO ][transport ] [Commander Kraken] bound_address {inet[/0:0:0:0:0:0:0:0:9300]}, publish_address {inet[localhost/127.0.0.1:930$[2015-09-29 14:12:00,668][INFO ][discovery ] [Commander Kraken] S3-IO-Cluster/vrI8A3B5TNutb6ZrFQT1eQ
[2015-09-29 14:12:05,027][INFO ][cluster.service ] [Commander Kraken] new_master [Commander Kraken][vrI8A3B5TNutb6ZrFQT1eQ][ip----][inet[localhost/127.$[2015-09-29 14:12:05,054][INFO ][http ] [Commander Kraken] bound_address {inet[/0:0:0:0:0:0:0:0:9200]}, publish_address {inet[localhost/127.0.0.1:920$[2015-09-29 14:12:05,054][INFO ][node ] [Commander Kraken] started
[2015-09-29 14:12:05,061][INFO ][gateway ] [Commander Kraken] recovered [0] indices into cluster_state
[2015-09-29 14:24:19,788][INFO ][cluster.metadata ] [Commander Kraken] [logstash-2015.09.29] creating index, cause [auto(bulk api)], shards [5]/[1], mappings [_d$[2015-09-29 14:24:20,191][DEBUG][action.bulk ] [Commander Kraken] [logstash-2015.09.29][4] failed to execute bulk item (index) index {[logstash-2015.09.29][$org.elasticsearch.index.mapper.MapperParsingException: failed to parse [_id]
at org.elasticsearch.index.mapper.core.AbstractFieldMapper.parse(AbstractFieldMapper.java:416)
at org.elasticsearch.index.mapper.internal.IdFieldMapper.parse(IdFieldMapper.java:295)
at org.elasticsearch.index.mapper.object.ObjectMapper.serializeValue(ObjectMapper.java:709)
at org.elasticsearch.index.mapper.object.ObjectMapper.parse(ObjectMapper.java:500)
at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:542)
at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:491)
at org.elasticsearch.index.shard.service.InternalIndexShard.prepareCreate(InternalIndexShard.java:392)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:444)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:150)
at org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction.performOnPrimary(TransportShardReplicationOper$
at org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction$AsyncShardOperationAction$1.run(TransportShardReplicationOperationAction$
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.index.mapper.MapperParsingException: Provided id [AVAZfWaivQTTvmzMp1dt] does not match the content one [55fee3c726045db681acd139]
at org.elasticsearch.index.mapper.internal.IdFieldMapper.parseCreateField(IdFieldMapper.java:310)
at org.elasticsearch.index.mapper.core.AbstractFieldMapper.parse(AbstractFieldMapper.java:406)
... 13 more

Obviously Elasticsearch thinks the second JSON log as the value to the index field or something.