S3 input missing files

Hi, we are trying to set up logstash to read ELB logs stored in an S3 bucket. The logs are split over several folders inside the bucket, and we're having some difficultly configuring the input. When reading from a single folder (using the prefix parameter) it works fine, but when we read from all folders it seems to miss out a lot of the files. Does the configuration look correct for this?

input {
  s3 {
    "access_key_id" => "XXXXXXXXXXXXXXXXXXXX"
    "secret_access_key" => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
    "region" => "eu-west-2"
    "bucket" => "elb-logs-euwest2"
    "exclude_pattern" => "ELBAccessLogTestFile|2017|2018"
    "interval" => 300
    "additional_settings" => {
      "force_path_style" => true
      "follow_redirects" => false
    }
  }
}

filter {
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_in_log}" }
  }
  date {
    match => [ "timestamp_in_log", "ISO8601" ]
  }
  mutate {
    add_field => { "brand" => "%{[@metadata][s3][key]}" }
  }
  mutate {
    gsub => [ brand, "^([^\/]*).*", "\1" ]
  }
}

output {
  elasticsearch { hosts => ["elasticsearch1:9200", "elasticsearch2:9200", "elasticsearch3:9200"] }
}

I wonder if there are entries in the sincedb that LS interprets as have been read already?

I should add that we only see this for new data. When we started the pipeline and it loaded the historical data, nothing was missed. But now when a new file is written to S3 there is a chance it will be.

Also some of the folders have more missed files than others, and on closer inspection this seems more severe on the ones sorted lower alphabetically. I'm not sure how logstash checks for new files, but if it searches each folder sequentially and we get a batch of files all around the same time, maybe it's just updating the sincedb from the first folder before checking the next?

IIUC S3 does not really have actual folders. It is an illusion via the file name. Just a bucket and files.

  Stud.interval(@interval) do
      process_files(queue)
      stop unless @watch_for_new_files
  end

This is the run loop.
It translates to...
Every 300 seconds, process the files into the queue, stop unless we are watching for new files (defaults to true).

  def process_files(queue)
    objects = list_new_files
    objects.each do |key|
      if stop?
        break
      else
        @logger.debug("S3 input processing", :bucket => @bucket, :key => key)
        process_log(queue, key)
      end
    end
  end

This is the process_files loop...
Iterate over any new found S3 objects, break out of the loop if the pipeline is stopping otherwise process the object.

def list_new_files
    objects = {}
    found = false
    begin
      @s3bucket.objects(:prefix => @prefix).each do |log|
        found = true
        @logger.debug("S3 input: Found key", :key => log.key)
        if ignore_filename?(log.key)
          @logger.debug('S3 input: Ignoring', :key => log.key)
        elsif log.content_length <= 0
          @logger.debug('S3 Input: Object Zero Length', :key => log.key)
        elsif !sincedb.newer?(log.last_modified)
          @logger.debug('S3 Input: Object Not Modified', :key => log.key)
        elsif log.storage_class.start_with?('GLACIER')
          @logger.debug('S3 Input: Object Archived to Glacier', :key => log.key)
        else
          objects[log.key] = log.last_modified
          @logger.debug("S3 input: Adding to objects[]", :key => log.key)
          @logger.debug("objects[] length is: ", :length => objects.length)
        end
      end
      @logger.info('S3 input: No files found in bucket', :prefix => prefix) unless found
    rescue Aws::Errors::ServiceError => e
      @logger.error("S3 input: Unable to list objects in bucket", :prefix => prefix, :message => e.message)
    end
    objects.keys.sort {|a,b| objects[a] <=> objects[b]}
  end

This is get new files loop...
Loop through all (prefix is not given) objects in the bucket, test if the object can be ignored/rejected by:
does the exclude_pattern apply?
is the object empty?
was the object last modified before the last file we processed before?
is archived to Glacier?
Yes? log a debug message.
No? the make a record of the object by key and modified time.
rescue any errors with a ERROR log message.
Sort the keys ascending by last modified time and return the list of keys.

Therefore to detect why files are being skipped/rejected, you need to check the files you see as overlooked against the questions above.

I think the problem lies with was the object last modified before the last file we processed before?. On subsequent runs, any files found having a modified time older than the youngest file processed last time will be overlooked.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.