S3 input not passing metadata

Hi,

I'm having a weird issue. When logstash pulls messages down from S3 and passes them over to ES, not all of the messages get their S3 metadata key value. Some messages get %{[@metadata][s3][key]} as their file field, whereas some get the actual file name. For the ones not getting the file field added, they're not getting their field type set, unlike the working ones that get the correct type set in the input.

I tried moving the add_field up into the input, but that failed; I'm guessing it's not added to the message at until the input is done getting the message? Running logstash in debug mode doesn't yield anymore information. Every file it pulls in has a key. I'm at a loss. Anyone seen anything like this?

[2019-07-18T14:42:34,838][DEBUG][logstash.inputs.s3       ] S3 input: Found key {:key=>"elasticmapreduce/clusterid/containers/application_11111111111111_0219/container_e02_11111111111111_0219_03_000022/stderr.gz"}
[2019-07-18T14:42:34,839][DEBUG][logstash.inputs.s3       ] S3 input: Adding to objects[] {:key=>"elasticmapreduce/clusterid/containers/application_11111111111111_0219/container_e02_11111111111111_0219_03_000022/stderr.gz"}
[2019-07-18T14:42:34,839][DEBUG][logstash.inputs.s3       ] objects[] length is:  {:length=>21347}

Here is the entirety of my config:

input {
  s3 {
    bucket => "*******"
    access_key_id => "*******"
    secret_access_key => "*******"
    region => "us-west-2"
    include_object_properties => true
    prefix => "elasticmapreduce/cluster-id/containers/"
    temporary_directory => "/tmp/logstash/clusterid"
    codec => multiline {
       pattern => "^%{GREEDYDATA}"
       negate => false
       what => previous
    }
    add_field => { logtype => "s3.emr" }
    add_field => { "environment" => "clusterid" }
  }
}
filter {
  mutate {
    add_field => { "file" => "%{[@metadata][s3][key]}" }
  }
}
output {
  elasticsearch {
    hosts => "*******"
    ssl => true
  }
}

So, minor addition: I changed the output to stdout, and I noticed that the ones that are failing to get the filename added are shorter messages, whereas the ones that are longer and get tagged as multiline_codec_max_lines_reached actually get tagged correctly and filename added...

It looks like you want to ingest the entire file as a single event. If the file is large enough to get broken into two events then the second one is missing the metadata. I wonder if that is related to this issue.

Are you able to increase max_bytes and max_lines on the codec to the point where this becomes a non-issue?

You are correct, I do want to ingest the entire file as a single event. The files that are being written to S3 are written as complete objects. I tried commenting out the multiline codec, and that seems to bring over the field on all messages, however this doesn't align with my intent to ingest each file as a message. I then tried adding max_lines to the config, and set it at various lengths (even down to 5); and when it triggered is when the filename got added, however when it isn't triggered, the filename isn't added. Here is some output with max_lines set to 5:

{
        "logtype" => "s3.emr",
    "environment" => "clusterid",
     "@timestamp" => 2019-07-18T15:26:19.611Z,
        "message" => "2019-07-17T22:59:17.267+0000: [CMS-concurrent-preclean: 0.007/0.007 secs] [Times: user=0.04 sys=0.00, real=0.01 secs] \n2019-07-17T22:59:17.267+0000: [CMS-concurrent-abortable-preclean-start]\n CMS: abort preclean due to time 2019-07-17T22:59:22.352+0000: [CMS-concurrent-abortable-preclean: 2.876/5.085 secs] [Times: user=6.35 sys=0.25, real=5.08 secs] \n2019-07-17T22:59:22.353+0000: [GC (CMS Final Remark) [YG occupancy: 422196 K (629120 K)]2019-07-17T22:59:22.353+0000: [Rescan (parallel) , 0.0296155 secs]2019-07-17T22:59:22.383+0000: [weak refs processing, 0.0000560 secs]2019-07-17T22:59:22.383+0000: [class unloading, 0.0127881 secs]2019-07-17T22:59:22.396+0000: [scrub symbol table, 0.0248678 secs]2019-07-17T22:59:22.420+0000: [scrub string table, 0.0005854 secs][1 CMS-remark: 36530K(1398144K)] 458727K(2027264K), 0.0700406 secs] [Times: user=0.62 sys=0.00, real=0.07 secs] \n2019-07-17T22:59:22.423+0000: [CMS-concurrent-sweep-start]\n2019-07-17T22:59:22.424+0000: [CMS-concurrent-sweep: 0.001/0.001 secs] [Times: user=0.00 sys=0.00, real=0.00 secs] ",
           "file" => "elasticmapreduce/clusterid/containers/application_111111111111_0090/container_e02_111111111111_0090_01_000088/stdout.gz",
           "tags" => [
        [0] "multiline",
        [1] "multiline_codec_max_lines_reached"
    ],
       "@version" => "1"
}
{
    "@timestamp" => 2019-07-18T15:26:19.616Z,
       "message" => "  from space 69888K, 100% used [0x00000002e6660000, 0x00000002eaaa0000, 0x00000002eaaa0000)\n  to   space 69888K,   0% used [0x00000002e2220000, 0x00000002e2220000, 0x00000002e6660000)\n concurrent mark-sweep generation total 1398144K, used 98407K [0x000000036b990000, 0x00000003c0ef0000, 0x00000007c0000000)\n Metaspace       used 60287K, capacity 61168K, committed 61328K, reserved 1101824K\n  class space    used 7793K, capacity 7995K, committed 8080K, reserved 1048576K",
          "file" => "%{[@metadata][s3][key]}",
          "tags" => [
        [0] "multiline"
    ],
      "@version" => "1"
}

You should be increasing max_lines to the point where you never get a multiline_codec_max_lines_reached tag. Only the first event produced by the codec has the metadata, so you want to make sure it only every produces one event.

Is there not a way to not have it break up the message? In other words: if the message hits max_lines, then it should be done with reading the file? When I set the max to 1000, the filename doesn't get applied.

Or, I guess to ask the more obvious question: how should I ingest an entire file from S3 so that each file is its own message? As you can see in the example below, some lines are different format, so triggering multiline off regex is a nightmare.

Here is one of the raw log files that I see frequently.

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/mnt3/yarn/usercache/livy/filecache/32/__spark_libs__5369727955251818709.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19/07/17 23:05:37 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
19/07/17 23:05:37 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
19/07/17 23:05:37 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
19/07/17 23:05:37 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
19/07/17 23:05:37 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
19/07/17 23:05:38 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
19/07/17 23:05:38 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
19/07/17 23:05:38 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
19/07/17 23:05:38 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
19/07/17 23:05:46 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
19/07/17 23:05:46 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
19/07/17 23:05:50 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
19/07/17 23:07:54 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

Oh, now I get it. To me it is clearly a bug.

Normally, each event that the codec produces gets the metadata added to it. When it reaches the end of the file it calls @codec.flush, but does not add metadata. Basically all of the processing done for the line-by-line handling also has to be done for the flushed event. So it is indeed that issue 153, but the PR only fixes decoration, it does not add metadata.

If you try to merge the entire file into a single event it will get flushed, but it will have no metadata.

I cannot see a workaround for that.

So, do you have any recommendation on what to do? It looks like that issue has been open for over a year, with no movement.

I am not saying it is a good approach, but it would be possible to merge all the lines of a file back together using an aggregate filter with 'task_id => "%{[@metadata][s3][key]}"'. Some lines will be out of order, but if you can live with that then just remove the multiline filter.

Seems reasonable. Do you have a suggestion for the code => section? I'm not seeing a relevant example that would put the message back together (even out of order)

Try something like this. Notice that the only fields on the event will be what is in the map, so if you need to other fields modify the code option to add them.

    aggregate {
        task_id => "%{[@metadata][s3][key]}"
        code => '
            unless map["@timestamp"]
                map["@timestamp"] = event.get("@timestamp")
            end

            # Need to add literal newline to separate lines
            map["message"] ||= ""
            map["message"] += event.get("message") + "
"

            event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout => 10
        timeout_task_id_field => "[@metadata][s3][key]"
    }

This worked. And as you stated, there are some logs where the lines are out of order. I'm checking with the developer to see if they are okay with this, but it's a thumbs up from me.

As a follow-on: In this mode, does logstash keep all the files open for new lines? If so, does the whole file get re-read in, or just the new line(s)? Apparently the files getting written to S3 every 5min. So some of them may be completed, but there are some that could be appended to.

Ah, no. read mode is designed to read files that have been finished with. If files are being appended to you have to use tail mode.

I'm not seeing that as an available option for the S3 input; only the file input plugin :frowning:

Sorry, I was confusing this thread with another thread I have been on a back-and-forth with this afternoon.

No worries. I just had the developer run a long-running job, and I examined the files that got pulled over, and found what we expected: there would only be expected two files in the s3 "subfolder", but then later on when the application finished, it would update the file in S3, to which Logstash would pick up the updated file and re-ingest it. At this point, we're fine with that outcome, primarily because we need something as opposed to nothing, and there is a lot of other work to be done to cleanup the logs to allow for more appropriate parsing. Thank you for your help to get me to this point. I will post my updated conf when the running ingestion is done.

Thanks again for your help with this, and my other posts, @Badger :slight_smile: Here is my current running config:

input {
  s3 {
    bucket => "********"
    access_key_id => "********"
    secret_access_key => "********"
    region => "********"
    include_object_properties => true
    prefix => "elasticmapreduce/clusterid/containers/"
    temporary_directory => "/tmp/logstash/clusterid"
    add_field => { logtype => "s3.emr" }
    add_field => { "environment" => "env_id" }
  }
}
filter {
  if [logtype] == "s3.emr" {
    aggregate {
      task_id => "%{[@metadata][s3][key]}"
        code => '
            unless map["@timestamp"]
                map["@timestamp"] = event.get("@timestamp")
            end

            # Need to add literal newline to separate lines
            map["message"] ||= ""
            map["message"] += event.get("message") + "
"

            event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout => 10
        timeout_task_id_field => "[@metadata][s3][key]"
    }
    mutate {
      add_field => { "file" => "%{[@metadata][s3][key]}" }
    }
    if [file] {
    mutate {
      copy => { "file" => "file_tmp" }
    }
    mutate {
      split => [ "file_tmp" , "/" ]
    }
    mutate {
      add_field => {
        "parent_folder" => "%{[file_tmp][0]}"
        "cluster_id" => "%{[file_tmp][1]}"
        "subfolder_name1" => "%{[file_tmp][2]}"
        "application_id" => "%{[file_tmp][3]}"
        "container_id" => "%{[file_tmp][4]}"
        "filename" => "%{[file_tmp][5]}"
      }
    }
    mutate {
      remove_field => [ "file_tmp" ]
    }
  }
  }
}
output {
  elasticsearch {
    hosts => "********"
    ssl => true
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.