How can I merge two events in logstash?

I'm trying to parse a log file into elasticsearch through logstash.

I want to send the following log as single event(i.e. as a single document) into elasticsearch.

ETL Wrapper Initializing - 09/27/2018 06:33:57
Wrapper Information - 09/27/2018 06:33:57

Reading Component Log Port Files - 09/27/2018 06:34:53
-- >     -- > Found 3 files and only merge non-zero byte files
Renaming Reject Files - 09/27/2018 06:34:56
Sending Notifications - 09/27/2018 06:34:56
Setting Exit Status - 09/27/2018 06:34:56
ETL Wrapper Finalizing - 09/27/2018 06:34:56

Here is my logstash configuration:

input {
file {
path => "D:/logs/file.log"
start_position => "beginning"
grok {
match => {"message" => "ETL Wrapper Initializing - %{DATESTAMP:JobStartTime}"}
match => {"message" => "ETL Wrapper Finalizing - %{DATESTAMP:JobEndTime}"}
if "_grokparsefailure" in [tags]{
if [message] =~ /^$/ {
        drop { }
    remove_field => ["@version","host","message" ]
output {
elasticsearch {
    hosts => "http://localhost:9200/"
    index => "success_index"
stdout { codec => rubydebug }

Output of above configuration:

    "JobStartTime" => "09/27/2018 09:33:41",
      "@timestamp" => 2018-12-05T10:55:44.698Z,
            "path" => "D:/logs/file.log"
    "JobEndTime" => "09/27/2018 09:34:16",
    "@timestamp" => 2018-12-05T10:55:44.784Z,
          "path" => "D:/logs/file.log"

My expected output:

    "JobStartTime" => "09/27/2018 09:33:41",
      "@timestamp" => 2018-12-05T10:55:44.698Z,
            "path" => "D:/logs/file.log"
    "JobEndTime" => "09/27/2018 09:34:16"

How can I merge "JobStartTime" and "JobEndTime" into single document?

Any help is appreciable..
Thanks in advance.!


Hello vinodanumarla,
Just have a look to the Aggregate Filter plugin which allows to do such things.

Hello Michel99_7
Thanks for the response.

I have tried with Aggregate Filter. Below is my filter,

aggregate {
task_id => "%{path}"
code => "
map['path'] = event.get('path')
map['JobStartTime'] ||=
map['JobStartTime'] << {'JobStartTime' => event.get('JobStartTime')}
map['JobEndTime'] ||=
map['JobEndTime'] << {'JobEndTime' => event.get('JobEndTime')}
push_previous_map_as_event => true
timeout => 3

Below is the output:

"JobEndTime" => [
[0] {
"JobEndTime" => nil
[1] {
"JobEndTime" => "09/27/2018 09:34:16"
"@timestamp" => 2018-12-06T09:32:33.412Z,
"JobStartTime" => [
[0] {
"JobStartTime" => "09/27/2018 09:33:41"
[1] {
"JobStartTime" => nil
"path" => "D:/logs/20180927_093341_PDCDWG1040_lylty_acct_extract.log

Able to combine two docs as above. But, can we do it without creating lists like above i.e without separate mappings.
Is it possible to get the output as below?

"JobStartTime" => "09/27/2018 09:33:41",
"@timestamp" => 2018-12-05T10:55:44.698Z,
"path" => "D:/logs/file.log"
"JobEndTime" => "09/27/2018 09:34:16"


I'm trying to parse a log file into elasticsearch through logstash.

I want to send the following log as single event(i.e. as a single document) into elasticsearch.

Here is my log file looks like:

ETL Wrapper Initializing - 09/27/2018 06:33:57

Wrapper Information - 09/27/2018 06:33:57

Reading Component Log Port Files - 09/27/2018 06:34:53

-- > -- > Found 3 files and only merge non-zero byte files

Renaming Reject Files - 09/27/2018 06:34:56

Sending Notifications - 09/27/2018 06:34:56

Setting Exit Status - 09/27/2018 06:34:56

ETL Wrapper Finalizing - 09/27/2018 06:34:56

Here is my logstash configuration:

input {
file {
path => "D:/logs/file.log"
start_position => "beginning"
grok {
match => {"message" => "ETL Wrapper Initializing - %{DATESTAMP:JobStartTime}"}
match => {"message" => "ETL Wrapper Finalizing - %{DATESTAMP:JobEndTime}"}
if "_grokparsefailure" in [tags]{
if [message] =~ /^$/ {
drop { }
remove_field => ["@version","host","message" ]
output {
elasticsearch {
hosts => "http://localhost:9200/"
index => "success_index"
stdout { codec => rubydebug }
Output of above configuration:

"JobStartTime" => "09/27/2018 09:33:41",
"@timestamp" => 2018-12-05T10:55:44.698Z,
"path" => "D:/logs/file.log"
"JobEndTime" => "09/27/2018 09:34:16",
"@timestamp" => 2018-12-05T10:55:44.784Z,
"path" => "D:/logs/file.log"
My expected output:

"JobStartTime" => "09/27/2018 09:33:41",
"@timestamp" => 2018-12-05T10:55:44.698Z,
"path" => "D:/logs/file.log"
"JobEndTime" => "09/27/2018 09:34:16"
How can I merge "JobStartTime" and "JobEndTime" into single document?

Any help is appreciable.. Thanks in advance!


Hi Vino,

You can also use a multiline command in the input section:

input {
file {
path => "C:/test/*.log"
#sincedb_path => "/dev/null"
start_position => "beginning"
discover_interval => 5
close_older => 60
mode => "tail"
file_sort_by => "path"
codec => multiline {
pattern => "^ETL Wrapper Initializing"
negate => true
what => previous

filter {
grok { match => {"message" => "^ETL Wrapper Initializing - %{DATESTAMP:JobStartTime}"} }
grok { match => {"message" => "ETL Wrapper Finalizing - %{DATESTAMP:JobEndTime}"} }
mutate { remove_field => ["message"] }

output {
stdout {codec => rubydebug}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.