Weird field/output issues

Hi,

I'm having some weird issues. Below is my config. I'm finding that if I leave the if [type] == "s3_emr", then the output will not output anything; however if I comment it out, the gelf output will kick in and start sending messages to our Graylog server. If I comment out the gelf out, leaving the stdout line, I still don't get any output. Also I know the type field is getting added, because the filter kicks in and does all the other magic I'm having it do. Lastly, the environment is not getting added to messages.

Any help on this would be greatly appreciated.

input {
  s3 {
    bucket => "*********"
    access_key_id => "*********"
    secret_access_key => "*********"
    region => "*********"
    include_object_properties => true
    prefix => "elasticmapreduce/clusterid_1/containers/"
    temporary_directory => "/tmp/logstash/1"
    add_field => { "environment" => "1" }
    type => "s3_emr"
  }
  s3 {
    bucket => "*********"
    access_key_id => "*********"
    secret_access_key => "*********"
    region => "*********"
    include_object_properties => true
    prefix => "elasticmapreduce/clusterid_2/containers/"
    temporary_directory => "/tmp/logstash/2"
    add_field => { "environment" => "2" }
    type => "s3_emr"
  }
  s3 {
    bucket => "*********"
    access_key_id => "*********"
    secret_access_key => "*********"
    region => "*********"
    include_object_properties => true
    prefix => "elasticmapreduce/clusterid_3/containers/"
    temporary_directory => "/tmp/logstash/3"
    add_field => { "environment" => "3" }
    type => "s3_emr"
  }
  s3 {
    bucket => "*********"
    access_key_id => "*********"
    secret_access_key => "*********"
    region => "*********"
    include_object_properties => true
    prefix => "elasticmapreduce/clusterid_4/containers/"
    temporary_directory => "/tmp/logstash/4"
    add_field => { "environment" => "4" }
    type => "s3_emr"
  }
}
filter {
  if [type] == "s3_emr" {
    aggregate {
       task_id => "%{[@metadata][s3][key]}"
         code => '
           unless map["@timestamp"]
             map["@timestamp"] = event.get("@timestamp")
           end

           # Need to add literal newline to separate lines
           map["message"] ||= ""
           map["message"] += event.get("message") + "
"
           event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout => 10
        timeout_task_id_field => "[@metadata][s3][key]"
    }
    mutate {
      add_field => { "fullpath" => "%{[@metadata][s3][key]}" }
    }
    if [fullpath] {
      mutate {
        copy => { "fullpath" => "fullpath_tmp" }
      }
      mutate {
        split => [ "fullpath_tmp" , "/" ]
      }
      mutate {
        add_field => {
          "parent_folder" => "%{[fullpath_tmp][0]}"
          "cluster_id" => "%{[fullpath_tmp][1]}"
          "subfolder_name1" => "%{[fullpath_tmp][2]}"
          "application_id" => "%{[fullpath_tmp][3]}"
          "container_id" => "%{[fullpath_tmp][4]}"
          "filename" => "%{[fullpath_tmp][5]}"
        }
      }
      mutate {
        remove_field => [ "fullpath_tmp" ]
      }
    }
    mutate {
      add_field => ["logstash_node", "hostname"]
    }
  }
}
output {
  if [type] == "s3_emr" {
    stdout {}
    gelf {
      host => "graylog.host"
      port => 12201
    }
  }
}

Below is a message that comes out if I remove the if and do stdout:

{
    "subfolder_name1" => "containers",
           "@version" => "1",
         "@timestamp" => 2019-07-25T16:16:36.285Z,
            "message" => "SLF4J: Class path contains multiple SLF4J bindings.\nSLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]\n19/07/24 14:59:50 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!\nSLF4J: Found binding in [jar:file:/mnt/yarn/usercache/livy/filecache/89/__spark_libs__5386156215855994904.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.\n19/07/24 14:59:50 WARN FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration.\n",
     "application_id" => "application_11111111_0017",
           "fullpath" => "elasticmapreduce/cluster1/containers/application_11111111_0017/container_11111111_0017_01_000001/stderr.gz",
         "cluster_id" => "cluster1",
      "parent_folder" => "elasticmapreduce",
           "filename" => "stderr.gz",
      "logstash_node" => "hostname",
       "container_id" => "container_11111111_0017_01_000001"
}

You are using push_map_as_event_on_timeout and cancelling every event in the aggregate filter. In this case the only fields that will be on the event that aggregate flushes are the timeout_task_id_field and the contents of the map. If you want type and environment to be on the events then add them to the map.

map["type"] = event.get("type")
map["environment"] = event.get("environment")
1 Like

That was it, thanks!!!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.