I would ingest the entire file as a single event and then process it two different ways. With any recent release of logstash you would do that using multiple pipelines and a forked-path pattern. However, I will use the old school method.
Note the use of literal newlines in patterns.
grok { match => { "message" => "^DATE: %{DATA:[@metadata][date]}
" } }
date { match => [ "[@metadata][date]", "MMM dd, yyyy" ] }
clone { clones => [ "section2" ] }
if [type] != "section2" {
grok {
match => { "message" => [ "TOTAL\s+(?<total>[0-9,]+)", "SUBTOTAL\s+(?<subtotal>[0-9,]+)" ] }
break_on_match => false
remove_field => [ "message" ]
}
} else {
# The (?m) enables multiline matches for the regexp
mutate { gsub => [ "message", "(?m)^TOTAL.*", "", "message", "(?m).*section 2 \*+\n", "" ] }
mutate { split => { "message" => "
" } }
ruby {
code => '
events = []
msg = event.get("message")
msg.each_index { |x|
if x == 0
@column_names = msg[x].split(/\t/)
else
columns = msg[x].split(/\t\s+/)
events << { @column_names[0] => columns[0],
@column_names[1] => columns[1],
@column_names[2] => columns[2] }
end
}
event.set("events", events)
'
remove_field => [ "message" ]
}
split { field => "events" }
}
You would use the same "if [type] !=" to decide which output to write to.
If you want the columns at the top level then see this.