How can I merge two events in logstash?

I'm trying to parse a log file into elasticsearch through logstash.

I want to send the following log as single event(i.e. as a single document) into elasticsearch.

######################################################
ETL Wrapper Initializing - 09/27/2018 06:33:57
######################################################
------------------------------------------------------
Wrapper Information - 09/27/2018 06:33:57
------------------------------------------------------

------------------------------------------------------
Reading Component Log Port Files - 09/27/2018 06:34:53
------------------------------------------------------
-- >     -- > Found 3 files and only merge non-zero byte files
------------------------------------------------------
Renaming Reject Files - 09/27/2018 06:34:56
------------------------------------------------------
######################################################
Sending Notifications - 09/27/2018 06:34:56
######################################################
------------------------------------------------------
Setting Exit Status - 09/27/2018 06:34:56
------------------------------------------------------
######################################################
ETL Wrapper Finalizing - 09/27/2018 06:34:56
######################################################
------------------------------------------------------

Here is my logstash configuration:

input {
file {
path => "D:/logs/file.log"
start_position => "beginning"
}
}
filter{
grok {
match => {"message" => "ETL Wrapper Initializing - %{DATESTAMP:JobStartTime}"}
match => {"message" => "ETL Wrapper Finalizing - %{DATESTAMP:JobEndTime}"}
}
if "_grokparsefailure" in [tags]{
drop{}
}
if [message] =~ /^$/ {
        drop { }
}
mutate{
    remove_field => ["@version","host","message" ]
    }
}
output {
elasticsearch {
    hosts => "http://localhost:9200/"
    index => "success_index"
    }
stdout { codec => rubydebug }
}

Output of above configuration:

{
    "JobStartTime" => "09/27/2018 09:33:41",
      "@timestamp" => 2018-12-05T10:55:44.698Z,
            "path" => "D:/logs/file.log"
}
{
    "JobEndTime" => "09/27/2018 09:34:16",
    "@timestamp" => 2018-12-05T10:55:44.784Z,
          "path" => "D:/logs/file.log"
}

My expected output:

{
    "JobStartTime" => "09/27/2018 09:33:41",
      "@timestamp" => 2018-12-05T10:55:44.698Z,
            "path" => "D:/logs/file.log"
    "JobEndTime" => "09/27/2018 09:34:16"
}

How can I merge "JobStartTime" and "JobEndTime" into single document?

Any help is appreciable..
Thanks in advance.!

-Vinod

Hello vinodanumarla,
Just have a look to the Aggregate Filter plugin which allows to do such things.

Hello Michel99_7
Thanks for the response.

I have tried with Aggregate Filter. Below is my filter,

aggregate {
task_id => "%{path}"
code => "
map['path'] = event.get('path')
map['JobStartTime'] ||=
map['JobStartTime'] << {'JobStartTime' => event.get('JobStartTime')}
map['JobEndTime'] ||=
map['JobEndTime'] << {'JobEndTime' => event.get('JobEndTime')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}

Below is the output:

{
"JobEndTime" => [
[0] {
"JobEndTime" => nil
},
[1] {
"JobEndTime" => "09/27/2018 09:34:16"
}
],
"@timestamp" => 2018-12-06T09:32:33.412Z,
"JobStartTime" => [
[0] {
"JobStartTime" => "09/27/2018 09:33:41"
},
[1] {
"JobStartTime" => nil
}
],
"path" => "D:/logs/20180927_093341_PDCDWG1040_lylty_acct_extract.log
.SUCCESS"
}

Able to combine two docs as above. But, can we do it without creating lists like above i.e without separate mappings.
Is it possible to get the output as below?

{
"JobStartTime" => "09/27/2018 09:33:41",
"@timestamp" => 2018-12-05T10:55:44.698Z,
"path" => "D:/logs/file.log"
"JobEndTime" => "09/27/2018 09:34:16"
}

Thanks!

I'm trying to parse a log file into elasticsearch through logstash.

I want to send the following log as single event(i.e. as a single document) into elasticsearch.

Here is my log file looks like:

######################################################
ETL Wrapper Initializing - 09/27/2018 06:33:57
######################################################

Wrapper Information - 09/27/2018 06:33:57


Reading Component Log Port Files - 09/27/2018 06:34:53

-- > -- > Found 3 files and only merge non-zero byte files

Renaming Reject Files - 09/27/2018 06:34:56

######################################################
Sending Notifications - 09/27/2018 06:34:56
######################################################

Setting Exit Status - 09/27/2018 06:34:56

######################################################
ETL Wrapper Finalizing - 09/27/2018 06:34:56
######################################################

Here is my logstash configuration:

input {
file {
path => "D:/logs/file.log"
start_position => "beginning"
}
}
filter{
grok {
match => {"message" => "ETL Wrapper Initializing - %{DATESTAMP:JobStartTime}"}
match => {"message" => "ETL Wrapper Finalizing - %{DATESTAMP:JobEndTime}"}
}
if "_grokparsefailure" in [tags]{
drop{}
}
if [message] =~ /^$/ {
drop { }
}
mutate{
remove_field => ["@version","host","message" ]
}
}
output {
elasticsearch {
hosts => "http://localhost:9200/"
index => "success_index"
}
stdout { codec => rubydebug }
}
Output of above configuration:

{
"JobStartTime" => "09/27/2018 09:33:41",
"@timestamp" => 2018-12-05T10:55:44.698Z,
"path" => "D:/logs/file.log"
}
{
"JobEndTime" => "09/27/2018 09:34:16",
"@timestamp" => 2018-12-05T10:55:44.784Z,
"path" => "D:/logs/file.log"
}
My expected output:

{
"JobStartTime" => "09/27/2018 09:33:41",
"@timestamp" => 2018-12-05T10:55:44.698Z,
"path" => "D:/logs/file.log"
"JobEndTime" => "09/27/2018 09:34:16"
}
How can I merge "JobStartTime" and "JobEndTime" into single document?

Any help is appreciable.. Thanks in advance!

Thanks,
kissanime

Hi Vino,

You can also use a multiline command in the input section:

input {
file {
path => "C:/test/*.log"
#sincedb_path => "/dev/null"
start_position => "beginning"
discover_interval => 5
close_older => 60
mode => "tail"
file_sort_by => "path"
codec => multiline {
pattern => "^ETL Wrapper Initializing"
negate => true
what => previous
}
}
}

filter {
grok { match => {"message" => "^ETL Wrapper Initializing - %{DATESTAMP:JobStartTime}"} }
grok { match => {"message" => "ETL Wrapper Finalizing - %{DATESTAMP:JobEndTime}"} }
mutate { remove_field => ["message"] }
}

output {
stdout {codec => rubydebug}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.