Hello,
As a prerequisite, as @Badger stated, this can only work enforcing 1 single worker.
When the Java execution
is being used, we've detected the events get re-arranged internally given the engine execution model.
In the documentation we've always somehow "ensured" this behavior when forcing 1 worker.
The issue https://github.com/elastic/logstash/issues/10938 has been opened to guarantee the correct behavior of the aggregate
filter when the number of worker is set to 1.
We do not have an ETA for this fix.
I've tried to reproduce your pipeline.
I had to apply few modifications: I changed the thread
as in the log I have no [application][name]
or [host][name]
.
input {
generator {
lines => [
"11:51:57,440 INFO [org.apache.cxf] (http-server1:18080-2) Inbound Message",
"11:51:57,554 INFO [org.apache.cxf] (http-server1:18080-2) Outbound Message",
"11:51:57,560 INFO [org.apache.cxf] (http-server2:18080-2) Inbound Message",
"11:51:57,635 INFO [org.apache.cxf] (http-server2:18080-2) Outbound Message"
]
count => 1
}
}
filter {
grok {
match => [ "message", "%{TIME:time} %{WORD:log.level}[ \t]+%{GREEDYDATA:t_shortmessage}"]
}
grok {
match => [ "t_shortmessage", "\[%{JAVAFILE}\][ \t]+\(%{DATA:thread}\)[ \t]%{GREEDYDATA}"]
}
if "Inbound Message" in [t_shortmessage] {
aggregate {
task_id => "app-%{thread}"
code => "map['start_message'] = event.get('t_shortmessage'); map['start_time'] = event.get('@timestamp')"
map_action => "create"
timeout => 30
timeout_tags => ['Aggregated']
timeout_code => "event.set('transaction.time', 30 ); event.set('transaction.result', -1 )"
}
}
if "Outbound Message" in [t_shortmessage] {
aggregate {
task_id => "app-%{thread}"
code => "event.set('start_message', map['start_message']); event.set('transaction.time', (event.get('@timestamp')-map['start_time'])*1000)"
map_action => "update"
end_of_task => true
add_tag => [ "Aggregated" ]
}
}
}
output {
if "Aggregated" in [tags] {
stdout { codec => rubydebug }
}
}
You can run it with:
./bin/logstash -f test-aggregate.conf -w 1 --java-execution=true
./bin/logstash -f test-aggregate.conf -w 1 --java-execution=false
With this pipeline
, there's a major issue: @timestamp
is not a representation of the time
field you extract from the log line.
The transaction.time
will just represent the amount of time taken by Logstash to process the lines.
It is possible to parse the time
into an epoch
, but as we're missing the full date, we would have issues with events which span over 2 days.
The output is in any case:
{
"@timestamp" => 2019-07-24T17:51:09.777Z,
"@version" => "1",
"thread" => "http-server1:18080-2",
"message" => "11:51:57,554 INFO [org.apache.cxf] (http-server1:18080-2) Outbound Message",
"sequence" => 0,
"transaction.time" => 21.0,
"start_message" => "[org.apache.cxf] (http-server1:18080-2) Inbound Message",
"log.level" => "INFO",
"t_shortmessage" => "[org.apache.cxf] (http-server1:18080-2) Outbound Message",
"tags" => [
[0] "Aggregated"
],
"time" => "11:51:57,554"
}
{
"@timestamp" => 2019-07-24T17:51:09.778Z,
"@version" => "1",
"thread" => "http-server2:18080-2",
"message" => "11:51:57,635 INFO [org.apache.cxf] (http-server2:18080-2) Outbound Message",
"sequence" => 0,
"transaction.time" => 0.0,
"start_message" => "[org.apache.cxf] (http-server2:18080-2) Inbound Message",
"log.level" => "INFO",
"t_shortmessage" => "[org.apache.cxf] (http-server2:18080-2) Outbound Message",
"tags" => [
[0] "Aggregated"
],
"time" => "11:51:57,635"
}