Hi,
I'm having some weird issues. Below is my config. I'm finding that if I leave the if [type] == "s3_emr"
, then the output will not output anything; however if I comment it out, the gelf output will kick in and start sending messages to our Graylog server. If I comment out the gelf out, leaving the stdout
line, I still don't get any output. Also I know the type field is getting added, because the filter kicks in and does all the other magic I'm having it do. Lastly, the environment
is not getting added to messages.
Any help on this would be greatly appreciated.
input {
s3 {
bucket => "*********"
access_key_id => "*********"
secret_access_key => "*********"
region => "*********"
include_object_properties => true
prefix => "elasticmapreduce/clusterid_1/containers/"
temporary_directory => "/tmp/logstash/1"
add_field => { "environment" => "1" }
type => "s3_emr"
}
s3 {
bucket => "*********"
access_key_id => "*********"
secret_access_key => "*********"
region => "*********"
include_object_properties => true
prefix => "elasticmapreduce/clusterid_2/containers/"
temporary_directory => "/tmp/logstash/2"
add_field => { "environment" => "2" }
type => "s3_emr"
}
s3 {
bucket => "*********"
access_key_id => "*********"
secret_access_key => "*********"
region => "*********"
include_object_properties => true
prefix => "elasticmapreduce/clusterid_3/containers/"
temporary_directory => "/tmp/logstash/3"
add_field => { "environment" => "3" }
type => "s3_emr"
}
s3 {
bucket => "*********"
access_key_id => "*********"
secret_access_key => "*********"
region => "*********"
include_object_properties => true
prefix => "elasticmapreduce/clusterid_4/containers/"
temporary_directory => "/tmp/logstash/4"
add_field => { "environment" => "4" }
type => "s3_emr"
}
}
filter {
if [type] == "s3_emr" {
aggregate {
task_id => "%{[@metadata][s3][key]}"
code => '
unless map["@timestamp"]
map["@timestamp"] = event.get("@timestamp")
end
# Need to add literal newline to separate lines
map["message"] ||= ""
map["message"] += event.get("message") + "
"
event.cancel
'
push_map_as_event_on_timeout => true
timeout => 10
timeout_task_id_field => "[@metadata][s3][key]"
}
mutate {
add_field => { "fullpath" => "%{[@metadata][s3][key]}" }
}
if [fullpath] {
mutate {
copy => { "fullpath" => "fullpath_tmp" }
}
mutate {
split => [ "fullpath_tmp" , "/" ]
}
mutate {
add_field => {
"parent_folder" => "%{[fullpath_tmp][0]}"
"cluster_id" => "%{[fullpath_tmp][1]}"
"subfolder_name1" => "%{[fullpath_tmp][2]}"
"application_id" => "%{[fullpath_tmp][3]}"
"container_id" => "%{[fullpath_tmp][4]}"
"filename" => "%{[fullpath_tmp][5]}"
}
}
mutate {
remove_field => [ "fullpath_tmp" ]
}
}
mutate {
add_field => ["logstash_node", "hostname"]
}
}
}
output {
if [type] == "s3_emr" {
stdout {}
gelf {
host => "graylog.host"
port => 12201
}
}
}
Below is a message that comes out if I remove the if
and do stdout
:
{
"subfolder_name1" => "containers",
"@version" => "1",
"@timestamp" => 2019-07-25T16:16:36.285Z,
"message" => "SLF4J: Class path contains multiple SLF4J bindings.\nSLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]\n19/07/24 14:59:50 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!\nSLF4J: Found binding in [jar:file:/mnt/yarn/usercache/livy/filecache/89/__spark_libs__5386156215855994904.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.\n19/07/24 14:59:50 WARN FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration.\n",
"application_id" => "application_11111111_0017",
"fullpath" => "elasticmapreduce/cluster1/containers/application_11111111_0017/container_11111111_0017_01_000001/stderr.gz",
"cluster_id" => "cluster1",
"parent_folder" => "elasticmapreduce",
"filename" => "stderr.gz",
"logstash_node" => "hostname",
"container_id" => "container_11111111_0017_01_000001"
}