IIUC S3 does not really have actual folders. It is an illusion via the file name. Just a bucket and files.
Stud.interval(@interval) do
process_files(queue)
stop unless @watch_for_new_files
end
This is the run loop.
It translates to...
Every 300 seconds, process the files into the queue, stop unless we are watching for new files (defaults to true
).
def process_files(queue)
objects = list_new_files
objects.each do |key|
if stop?
break
else
@logger.debug("S3 input processing", :bucket => @bucket, :key => key)
process_log(queue, key)
end
end
end
This is the process_files loop...
Iterate over any new found S3 objects, break out of the loop if the pipeline is stopping otherwise process the object.
def list_new_files
objects = {}
found = false
begin
@s3bucket.objects(:prefix => @prefix).each do |log|
found = true
@logger.debug("S3 input: Found key", :key => log.key)
if ignore_filename?(log.key)
@logger.debug('S3 input: Ignoring', :key => log.key)
elsif log.content_length <= 0
@logger.debug('S3 Input: Object Zero Length', :key => log.key)
elsif !sincedb.newer?(log.last_modified)
@logger.debug('S3 Input: Object Not Modified', :key => log.key)
elsif log.storage_class.start_with?('GLACIER')
@logger.debug('S3 Input: Object Archived to Glacier', :key => log.key)
else
objects[log.key] = log.last_modified
@logger.debug("S3 input: Adding to objects[]", :key => log.key)
@logger.debug("objects[] length is: ", :length => objects.length)
end
end
@logger.info('S3 input: No files found in bucket', :prefix => prefix) unless found
rescue Aws::Errors::ServiceError => e
@logger.error("S3 input: Unable to list objects in bucket", :prefix => prefix, :message => e.message)
end
objects.keys.sort {|a,b| objects[a] <=> objects[b]}
end
This is get new files loop...
Loop through all (prefix is not given) objects in the bucket, test if the object can be ignored/rejected by:
does the exclude_pattern apply?
is the object empty?
was the object last modified before the last file we processed before?
is archived to Glacier?
Yes? log a debug message.
No? the make a record of the object by key and modified time.
rescue any errors with a ERROR log message.
Sort the keys ascending by last modified time and return the list of keys.
Therefore to detect why files are being skipped/rejected, you need to check the files you see as overlooked against the questions above.
I think the problem lies with was the object last modified before the last file we processed before?
. On subsequent runs, any files found having a modified time older than the youngest file processed last time will be overlooked.