Hello,
i am new at using logstash and its filters. I got log files looking like this:
</>
Started 'DB to PostgreSQL transfer' workflow at 2024.05.19 09:31:07
database v87_orig_rec_8904_03
########################################################
timestamp | module name | result
2024.05.19 13:37:06 | DbToPostgres | success
2024.05.19 13:00:49 | PgDump | success
2024.05.19 09:43:30 | Converter | success
2024.05.19 10:00:40 | Import | success
2024.05.19 10:29:59 | Validation | failure
2024.05.19 12:31:40 | DbUpdate | success
2024.05.19 12:45:46 | Final | success
2024.05.19 11:21:25 | CreateArchive | failure
2024.05.19 13:37:02 | DbInsert | success
2024.05.19 11:35:37 | GenerateBucket | success
2024.05.19 09:36:10 | PrepareDB | success
Finishing workflow at 2024.05.19 13:37:06
</>
I create a multiline in filebeat. This is the original event:
</>
"original" => "Started 'DB to PostgreSQL transfer' workflow at 2024.05.19 09:31:07\n\ndatabase v87_orig_rec_r8904_03\n########################################################\n\ntimestamp | module name | result\n________________________________________________________\n2024.05.19 13:37:06 | DbToPostgres | success\n2024.05.19 13:00:49 | PgDump | success\n2024.05.19 09:43:30 | Converter | success\n2024.05.19 10:00:40 | Import | success\n2024.05.19 10:29:59 | Validation | failure\n2024.05.19 12:31:40 | DbUpdate | success\n2024.05.19 12:45:46 | Final | success\n2024.05.19 11:21:25 | CreateArchive | failure\n2024.05.19 13:37:02 | DbInsert | success\n2024.05.19 11:35:37 | GenerateBucket | success\n2024.05.19 09:36:10 | PrepareDB | success\n\nFinishing workflow at 2024.05.19 13:37:06"
</>
I would like capture all matches from the search pattern(s) in one message so that one log appears in elastic/opensearch as one hit. That works so far but only for search patterns witch find only a single match. The search pattern which match multiple lines (like 2024.05.19 13:37:06 | DbToPostgres | success and so on) only captures the first match when i use this pattern:
</>
"Started 'DB to PostgreSQL transfer' workflow at (?<workflow_start_timestamp>%{YEAR:workflow_start_year}.%{MONTHNUM:workflow_start_month}.%{MONTHDAY:workflow_start_day} %{HOUR:workflow_start_hour}:%{MINUTE:workflow_start_minute}:%{SECOND:workflow_start_second})\n\ndatabase(?%{DATA:DB_VERSION}%{DATA:DB_TYPE}%{DATA:DB_TYPE_NEW}%{DATA:DB_ID}%{NUMBER:REV_NO})\n#+\n\ntimestamp\s*|\smodule name\s|\sresult\n_+\n(?m)(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}%{GREEDYDATA:message}Finishing workflow at (?<workflow_finish_timestamp>%{YEAR:workflow_finish_year}.%{WORD:workflow_finish_month}.%{WORD:workflow_finish_day} %{TIME:workflow_finish_time})".
</>
When i use this pattern:
</>
"Started 'DB to PostgreSQL transfer' workflow at (?<workflow_start_timestamp>%{YEAR:workflow_start_year}.%{MONTHNUM:workflow_start_month}.%{MONTHDAY:workflow_start_day} %{HOUR:workflow_start_hour}:%{MINUTE:workflow_start_minute}:%{SECOND:workflow_start_second})\n\ndatabase(?%{DATA:DB_VERSION}%{DATA:DB_TYPE}%{DATA:DB_TYPE_NEW}%{DATA:DB_ID}%{NUMBER:REV_NO})\n#+\n\ntimestamp\s*|\smodule name\s|\sresult\n_+\n((?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n)+%{GREEDYDATA:message}Finishing workflow at %{YEAR:workflow_finish_year}.%{WORD:workflow_finish_month}.%{WORD:workflow_finish_day} %{TIME:workflow_finish_time}"
</>
it only caputres the last match oft he multiple matches (2024.05.19 09:36:10 | PrepareDB ) . I would like to capture all logtimestamps, module names and results in arrays like this:
</>
"workflow_start_year" => "2024",
"log_timestamp" => [
[0] "2024.05.19 13:37:06",
[1] "2024.05.19 13:00:49",
[2] "2024.05.19 09:43:30",
[3] "2024.05.19 10:00:40"
],
"workflow_start_day" => "19",
"log.date" => [
[0] "2024.05.19",
[1] "2024.05.19",
[2] "2024.05.19",
[3] "2024.05.19"
],
"module_name" => [
[0] "DbToPostgres",
[1] "PgDump",
[2] "Converter",
[3] "Import"
],
"log_day" => [
[0] "19",
[1] "19",
[2] "19",
[3] "19"
],
</>
That only works if i repeat the pattern multiple times like this:
</>
"Started 'DB2 to PostgreSQL transfer' workflow at (?<workflow_start_timestamp>%{YEAR:workflow_start_year}.%{MONTHNUM:workflow_start_month}.%{MONTHDAY:workflow_start_day} %{HOUR:workflow_start_hour}:%{MINUTE:workflow_start_minute}:%{SECOND:workflow_start_second})\n\ndatabase(?%{DATA:DB_VERSION}%{DATA:DB_TYPE}%{DATA:DB_TYPE_NEW}%{DATA:DB_ID}%{NUMBER:REV_NO})\n#+\n\ntimestamp\s*|\smodule name\s|\sresult\n_+\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n%{GREEDYDATA:message}Finishing workflow at %{YEAR:workflow_finish_year}.%{WORD:workflow_finish_month}.%{WORD:workflow_finish_day} %{TIME:workflow_finish_time}"
</>
The problem is, the number of entries in the table (logtimestamp | module name | result) varies from log to log and i can not set endless entries of the same search pattern one after another to capture all entries.
Here ist the complete filter section:
</>
filter {
if [fields][logtype] == 'importer' {
grok {
match => {
"message" => [
"Started 'DB2 to PostgreSQL transfer' workflow at (?<workflow_start_timestamp>%{YEAR:workflow_start_year}.%{MONTHNUM:workflow_start_month}.%{MONTHDAY:workflow_start_day} %{HOUR:workflow_start_hour}:%{MINUTE:workflow_start_minute}:%{SECOND:workflow_start_second})\n\ndatabase(?%{DATA:DB_VERSION}%{DATA:DB_TYPE}%{DATA:DB_TYPE_NEW}%{DATA:DB_ID}%{NUMBER:REV_NO})\n#+\n\ntimestamp\s*|\smodule name\s|\sresult\n_+\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n(?<log_timestamp>(?<log.date>%{YEAR:log_year}.%{MONTHNUM:log_month}.%{MONTHDAY:log_day})\s*(?<log.time>%{HOUR:log_hour}:%{MINUTE:log_minute}:%{SECOND:log_second}))\s*|\s*%{WORD:module_name}\s*|\s*%{WORD:result}\n%{GREEDYDATA:message}Finishing workflow at %{YEAR:workflow_finish_year}.%{WORD:workflow_finish_month}.%{WORD:workflow_finish_day} %{TIME:workflow_finish_time}"
]
}
break_on_match => false
overwrite => [ "message" ]
}
</>
I also tried the search pattern with the multiline tag (?m) and „(SEARCH_PATTERN)+“ but only one match of the table entries is captured.
How can I define the Grok filter to capture all entries in a single event? How can all entries that match the same search filter be saved in lists?
I don't want to split the multiline into individual lines. I have already tested this successfully.
Best regards