Logstash s3 Plugin error

Environment:
RHEL 6.8
Logstash 2.4.0
Kibana 4.5.4
Amazon ElasticSearch 2.3

I've researched the error I am getting and while many have seen similar errors, I haven't found someone with the exact same error. I've tried numerous configs that worked for other people, but are not working for me. I was originally using Logstash 2.3.x, but upgraded to 2.4 hoping that might fix my problem, but I am still getting the same error.

I'm trying to ingest CloudTrail logs using the s3 plugin, cloudtrail codec and amazon_es output plugin. I am getting the following error:

Settings: Default pipeline workers: 2
Pipeline main started
A plugin had an unrecoverable error. Will restart this plugin.
Plugin: <LogStash::Inputs::S3 bucket=>"aws-logs", delete=>false, interval=>60, prefix=>"AWSLogs/xxxxxxxxxxxx/CloudTrail-Digest/us-east-1/", type=>"cloudtrail", codec=><LogStash::Codecs::CloudTrail spool_size=>50>, sincedb_path=>"/opt/logstash/sincedb", region=>"us-east-1", use_ssl=>true, temporary_directory=>"/tmp/logstash">
Error: undefined method `each' for nil:NilClass {:level=>:error}

My config file is (I'm using an IAM role with full perms to s3, hence no creds in conf file):
input{
s3 {
bucket => "aws-logs"
region => "us-east-1"
codec => "cloudtrail"
type => "cloudtrail"
prefix => "AWSLogs/xxxxxxxxxxxx/CloudTrail-Digest/us-east-1"
}
}
filter {
if [type] == "cloudtrail" {
grok {
match => [ "@message", ""eventVersion":"%{GREEDYDATA:eventVersion}","errorCode":"%{GREEDYDATA:errorCode}","eventTime":"%{GREEDYDATA:eventTime}","requestParameters":"%{GREEDYDATA:requestParameters}","errorMessage":"%{GREEDYDATA:errorMessage}","responseElements":"%{GREEDYDATA:responseElements}","eventName":"%{GREEDYDATA:eventName}","userIdentity":"%{GREEDYDATA:userIdentity}","eventSource":"%{GREEDYDATA:eventSource}","userAgent":"%{GREEDYDATA:userAgent}","sourceIPAddress":"%{GREEDYDATA:sourceIPAddress}""]
}
geoip {
source => "sourceIPAddress"
target => "geoip"
database => "/etc/logstash/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
}
}
output{
amazon_es {
hosts => "search-myesdomain.us-east-1.es.amazonaws.com"
region => "us-east-1"
index => "cwl-%{+YYYY.MM.dd}"
flush_size => 250000
port => 80
protocol => "http"

I also have a Lambda function that pushes cloudtrail logs to Amazon ES and that works great, but when I turn off the lambda function and use Logstash it breaks. The purpose for using Logstash is so I can add geoip information to my logs. I don't know how to do that with Lambda.

I have gotten some logs in to ES with Logstash using this config, but it is usually a blip and then stops pushing data to ES.

Any help is much appreciated!