Catch records and erros that don't go into the elasticsearch cluster in another bucket

Hi All,

Trying to transfer data from S3 bucket to ES. Using Logstash for that. Everytime a new file is deposited, it will be transferred to ES. The follow is the config file.

input {
            s3 {
                    bucket => "logging-services-ods-dev"
                    access_key_id => "*"
                    secret_access_key => "*"
                    region => "us-east-1"
                    prefix => "Engineering_Indexes/platform_dump/"
                    codec => "json"
                    type => "s3"
            }
    }

    filter{

     mutate{
       add_field =>{
            "file" => "%{[@metadata][s3][key]}"
            }
    }
     mutate {
     gsub => ["file", ".{34}", "" ]
    }
     mutate {
        gsub => [ "file", ".{3}$", "", "file", "([0-9]{4}-[0-9]{2})-[0-9]+", "\1" ] }
    }

    output {
    amazon_es {
    hosts => ["endpoint"]
    region => "us-east-1"
    aws_access_key_id => '*'
    aws_secret_access_key => '*'
    index => "%{file}"
    template_name => "sqe_template1"
    template_overwrite => "true"
    codec => "json"

    }
    }

However if can records are rejected from a file, how can I catch those rejected records either in a new s3 bucket or a by writing to a file. Is it possible to catch the rejected or error records?

Thank You,
Mohit Ruke

It sounds like you want a DLQ, but that is currently only supported for an elasticsearch output.

Yes that is what I need for the output , can you help put how to integrate it with my current output. Like the error files or records should be written somewhere

Thank You

If you look at the code for the elasticsearch output, there is actually very little needed to support a DLQ. It basically says "If you get a 400 or 404 back from ES, then call execution_context.dlq_writer". execution_context.dlq_writer is core logstash functionality, available to any plugin. So it should be pretty straightforward to modify the amazon_es output to make the same call in the same circumstances.

Hey badger I tired implementing the logstash DLQ but I am getting this error:

input {
  dead_letter_queue  {
    commit_offsets  =>  true
    path  =>  "/var/lib/logstash/dead_letter_queue"
    pipeline_id  =>  "main"
  }
}

output {

amazon_es {
hosts => ["https://vpc-log-service-legacy-data-lm4aq2wmxzh4xxj2uukkucykfi.us-east-1.es.amazonaws.com"]
region => "us-east-1"
aws_access_key_id => '*'
aws_secret_access_key => '*'
index => "dead_letter_queue-error"
}
}

Error:
[ERROR] 2019-07-03 18:56:01.122 [[main]-pipeline-manager] pipeline - Error registering plugin {:pipeline_id=>"main", :plugin=>"<LogStash::Inputs::DeadLetterQueue pipeline_id=>"main", path=>"/var/lib/logstash/dead_letter_queue", id=>"87551508e23c487ab5776be5570a2ce4189f7ffff5083e2ead7b74203deaf8d7", commit_offsets=>true, enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_ebb78360-e392-4b37-be0d-ef39476d811c", enable_metric=>true, charset=>"UTF-8">>", :error=>"/var/lib/logstash/dead_letter_queue/main", :thread=>"#<Thread:0x24ca9e74 run>"}
[ERROR] 2019-07-03 18:56:01.201 [[main]-pipeline-manager] pipeline - Pipeline aborted due to error {:pipeline_id=>"main", :exception=>java.nio.file.NoSuchFileException: /var/lib/logstash/dead_letter_queue/main, :backtrace=>["sun.nio.fs.UnixException.translateToIOException(sun/nio/fs/UnixException.java:86)", "sun.nio.fs.UnixException.asIOException(sun/nio/fs/UnixException.java:111)", "sun.nio.fs.LinuxWatchService$Poller.implRegister(sun/nio/fs/LinuxWatchService.java:246)", "sun.nio.fs.AbstractPoller.processRequests(sun/nio/fs/AbstractPoller.java:260)", "sun.nio.fs.LinuxWatchService$Poller.run(sun/nio/fs/LinuxWatchService.java:364)", "java.lang.Thread.run(java/lang/Thread.java:748)"], :thread=>"#<Thread:0x24ca9e74 run>"}
[ERROR] 2019-07-03 18:56:01.221 [Converge PipelineAction::Create] agent - Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create, action_result: false", :backtrace=>nil}

I think you have misunderstood. There are two sides to the DLQ. It is written to by the elasticsearch output when it gets a 400 or 404 from elasticsearch. I was suggesting that you could modify the amazon_es output to support the same functionality.

If you did that then amazon_es would write to the DLQ which you would be able to consume with a dead_letter_queue input. However, until something writes to the DLQ the file will not exist and the dead_letter_queue input will log that error.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.