How to capture all entries that matches the same search pattern

Hello,

i am new at using logstash and its filters. I got log files looking like this:

</>
Started 'DB to PostgreSQL transfer' workflow at 2024.05.19 09:31:07

database v87_orig_rec_8904_03
########################################################

timestamp | module name | result


2024.05.19 13:37:06 | DbToPostgres | success
2024.05.19 13:00:49 | PgDump | success
2024.05.19 09:43:30 | Converter | success
2024.05.19 10:00:40 | Import | success
2024.05.19 10:29:59 | Validation | failure
2024.05.19 12:31:40 | DbUpdate | success
2024.05.19 12:45:46 | Final | success
2024.05.19 11:21:25 | CreateArchive | failure
2024.05.19 13:37:02 | DbInsert | success
2024.05.19 11:35:37 | GenerateBucket | success
2024.05.19 09:36:10 | PrepareDB | success

Finishing workflow at 2024.05.19 13:37:06

</>
I create a multiline in filebeat. This is the original event:

</>
"original" => "Started 'DB to PostgreSQL transfer' workflow at 2024.05.19 09:31:07\n\ndatabase v87_orig_rec_r8904_03\n########################################################\n\ntimestamp | module name | result\n________________________________________________________\n2024.05.19 13:37:06 | DbToPostgres | success\n2024.05.19 13:00:49 | PgDump | success\n2024.05.19 09:43:30 | Converter | success\n2024.05.19 10:00:40 | Import | success\n2024.05.19 10:29:59 | Validation | failure\n2024.05.19 12:31:40 | DbUpdate | success\n2024.05.19 12:45:46 | Final | success\n2024.05.19 11:21:25 | CreateArchive | failure\n2024.05.19 13:37:02 | DbInsert | success\n2024.05.19 11:35:37 | GenerateBucket | success\n2024.05.19 09:36:10 | PrepareDB | success\n\nFinishing workflow at 2024.05.19 13:37:06"
</>

I would like capture all matches from the search pattern(s) in one message so that one log appears in elastic/opensearch as one hit. That works so far but only for search patterns witch find only a single match. The search pattern which match multiple lines (like 2024.05.19 13:37:06 | DbToPostgres | success and so on) only captures the first match when i use this pattern:

</>
"Started 'DB to PostgreSQL transfer' workflow at (?<workflow_start_timestamp>%{YEAR:workflow_start_year}.%{MONTHNUM:workflow_start_month}.%{MONTHDAY:workflow_start_day} %{HOUR:workflow_start_hour}:%{MINUTE:workflow_start_minute}:%{SECOND:workflow_start_second})\n\ndatabase(?%{DATA:DB_VERSION}%{DATA:DB_TYPE}%{DATA:DB_TYPE_NEW}%{DATA:DB_ID}%{NUMBER:REV_NO})\n#+\n\ntimestamp\s*|\smodule name\s|\sresult\n_+\n(?m)(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}%{GREEDYDATA:message}Finishing workflow at (?<workflow_finish_timestamp>%{YEAR:workflow_finish_year}.%{WORD:workflow_finish_month}.%{WORD:workflow_finish_day} %{TIME:workflow_finish_time})".
</>

When i use this pattern:

</>
"Started 'DB to PostgreSQL transfer' workflow at (?<workflow_start_timestamp>%{YEAR:workflow_start_year}.%{MONTHNUM:workflow_start_month}.%{MONTHDAY:workflow_start_day} %{HOUR:workflow_start_hour}:%{MINUTE:workflow_start_minute}:%{SECOND:workflow_start_second})\n\ndatabase(?%{DATA:DB_VERSION}%{DATA:DB_TYPE}%{DATA:DB_TYPE_NEW}%{DATA:DB_ID}%{NUMBER:REV_NO})\n#+\n\ntimestamp\s*|\smodule name\s|\sresult\n_+\n((?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n)+%{GREEDYDATA:message}Finishing workflow at %{YEAR:workflow_finish_year}.%{WORD:workflow_finish_month}.%{WORD:workflow_finish_day} %{TIME:workflow_finish_time}"

</>

it only caputres the last match oft he multiple matches (2024.05.19 09:36:10 | PrepareDB ) . I would like to capture all logtimestamps, module names and results in arrays like this:

</>
"workflow_start_year" => "2024",
"log_timestamp" => [
[0] "2024.05.19 13:37:06",
[1] "2024.05.19 13:00:49",
[2] "2024.05.19 09:43:30",
[3] "2024.05.19 10:00:40"
],
"workflow_start_day" => "19",
"log.date" => [
[0] "2024.05.19",
[1] "2024.05.19",
[2] "2024.05.19",
[3] "2024.05.19"
],
"module_name" => [
[0] "DbToPostgres",
[1] "PgDump",
[2] "Converter",
[3] "Import"
],
"log_day" => [
[0] "19",
[1] "19",
[2] "19",
[3] "19"
],

</>

That only works if i repeat the pattern multiple times like this:

</>
"Started 'DB2 to PostgreSQL transfer' workflow at (?<workflow_start_timestamp>%{YEAR:workflow_start_year}.%{MONTHNUM:workflow_start_month}.%{MONTHDAY:workflow_start_day} %{HOUR:workflow_start_hour}:%{MINUTE:workflow_start_minute}:%{SECOND:workflow_start_second})\n\ndatabase(?%{DATA:DB_VERSION}%{DATA:DB_TYPE}%{DATA:DB_TYPE_NEW}%{DATA:DB_ID}%{NUMBER:REV_NO})\n#+\n\ntimestamp\s*|\smodule name\s|\sresult\n_+\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n%{GREEDYDATA:message}Finishing workflow at %{YEAR:workflow_finish_year}.%{WORD:workflow_finish_month}.%{WORD:workflow_finish_day} %{TIME:workflow_finish_time}"

</>

The problem is, the number of entries in the table (logtimestamp | module name | result) varies from log to log and i can not set endless entries of the same search pattern one after another to capture all entries.

Here ist the complete filter section:

</>

filter {
if [fields][logtype] == 'importer' {
grok {
match => {
"message" => [
"Started 'DB2 to PostgreSQL transfer' workflow at (?<workflow_start_timestamp>%{YEAR:workflow_start_year}.%{MONTHNUM:workflow_start_month}.%{MONTHDAY:workflow_start_day} %{HOUR:workflow_start_hour}:%{MINUTE:workflow_start_minute}:%{SECOND:workflow_start_second})\n\ndatabase(?%{DATA:DB_VERSION}%{DATA:DB_TYPE}%{DATA:DB_TYPE_NEW}%{DATA:DB_ID}%{NUMBER:REV_NO})\n#+\n\ntimestamp\s*|\smodule name\s|\sresult\n_+\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n%{GREEDYDATA:message}Finishing workflow at %{YEAR:workflow_finish_year}.%{WORD:workflow_finish_month}.%{WORD:workflow_finish_day} %{TIME:workflow_finish_time}"
]
}
break_on_match => false
overwrite => [ "message" ]
}

</>

I also tried the search pattern with the multiline tag (?m) and „(SEARCH_PATTERN)+“ but only one match of the table entries is captured.

How can I define the Grok filter to capture all entries in a single event? How can all entries that match the same search filter be saved in lists?

I don't want to split the multiline into individual lines. I have already tested this successfully.

Best regards

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance. See What is OpenSearch and the OpenSearch Dashboard? | Elastic for more details.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )