Hello,
I am monitoring a Java application with the elastic agent and create some custom transactions using the Java apm client.
I configured the apm server to write all data into a file, so it can be collected later.
I wrote a logstash pipeline configuration that reads the file, uses some filters to aggregate the data, and then outputs it to elasticsearch so we can analyze the data with kibana.
The system we want to monitor has different states and we create events for state changes. In Logstash we want to aggregate the state information to the other events, so we can easily filter the events by the state that they happened in in Kibana.
Moreover, we want to add the time between 2 events of certain types.
We use the aggregate filter for this.
The problem is that the events from the file to not seem to be processed in order, which makes the data aggregation faulty.
As far as I found online so far is that we need to configure Logstash to only run with one worker which I did in the "logstash.yml" file, by setting pipeline.workers: 1
.
I also called logstash with the parameter "-w 1" to be sure only one worker should be used.
However, this does not solve the issue.
Input File looks something like this:
{"@timestamp":"2020-04-01T14:00:08.556Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":20},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"0f79c4d3f204c35a","type":"lang","sampled":true}}
{"@timestamp":"2020-04-01T14:00:08.558Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":79},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"a35c3da608d49839","type":"user","sampled":true}}
{"@timestamp":"2020-04-01T14:00:08.560Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":20},"name":"unnamed","id":"7918695d672c9b26","span_count":{"dropped":0,"started":0},"type":"state","sampled":true}}
{"@timestamp":"2020-04-01T14:00:26.036Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":23},"name":"unnamed","id":"37c8a14d002e9bae","span_count":{"dropped":0,"started":0},"type":"state","sampled":true}}
{"@timestamp":"2020-04-01T14:00:27.023Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":39},"name":"unnamed","id":"45033b85b17926ce","span_count":{"dropped":0,"started":0},"type":"nav","sampled":true}}
{"@timestamp":"2020-04-01T14:00:56.229Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":25},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"144903716325a3af","type":"nav","sampled":true}}
{"@timestamp":"2020-04-01T14:01:01.645Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":14},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"31b008b95200b597","type":"state","sampled":true}}
{"@timestamp":"2020-04-01T14:01:04.762Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":29},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"94557c57d36ed289","type":"nav","sampled":true}}
{"@timestamp":"2020-04-01T14:01:06.231Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":22},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"f9a6e8949b41134a","type":"state","sampled":true}}
{"@timestamp":"2020-04-01T14:01:09.824Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":27},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"8f6983f8946beeb5","type":"nav","sampled":true}}
{"@timestamp":"2020-04-01T14:01:10.871Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":420},"name":"unnamed","id":"ef0bb6c63c9842dc","span_count":{"dropped":0,"started":0},"type":"nav","sampled":true}}
{"@timestamp":"2020-04-01T14:01:15.763Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":52},"name":"unnamed","id":"9366e6c813135893","span_count":{"dropped":0,"started":0},"type":"nav","sampled":true}}
The Logstash config looks like this:
input {
file {
path => "C:/apm-server/*"
}
}
filter{
#parse event json data, remove message after parsing
json {
source => "message"
remove_field => [ "message" ]
}
#filter out other apm events
if [processor][event] == "transaction" {
if [transaction][type] == "state" {
mutate {
add_tag => ["stateChange"]
add_field => {
"state" => "%{[labels][state]}"
}
remove_field => [ "labels" ]
}
}
#add observer_id field, from observer.id so it can be used as an identifier in aggregate filters
mutate { add_field => {
"observer_id" => "%{[observer][id]}"
}
}
if [transaction][type] == "state" {
aggregate {
task_id => "%{observer_id}"
code => "
map['state'] = event.get('state')
"
}
} else if [transaction][type] == "nav"{
aggregate {
task_id => "%{observer_id}"
code => "
event.set('state', map['state'])
if map['lastTime'] != nil
event.set('duration', event.get('@timestamp') - map['lastTime'])
end
map['lastTime'] = event.get('@timestamp')
"
}
}
#filter to count events, for debuggung
aggregate {
task_id => "%{observer_id}"
code => "
if map['count'] == nil
map['count'] = 0
end
event.set('count', map['count'])
map['count'] = map['count'] + 1
"
}
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
user => "elastic"
password => "changeme"
}
stdout { codec => json }
}
I added the final aggregate filter to add a counter that shows me in what order the events are processed, and the count is not in the same order as the events are in the file.
Which also means that the "nav" events get the wrong 'state' assigned to them and also the time difference in 'duration' is sometimes wrong.
In kibana it then looks something like this:
transaction.type | state | duration | count
lang | - | - | 0
user | - | - | 1
state | A | - | 2
state | B | - | 3
nav | B | -48.74 | 11
nav | B | - | 6
state | C | - | 4
nav | B | 8.533 | 7
state | B | - | 5
nav | B | 5.062 | 8
nav | B | 1.047 | 9
nav | B | 4.892 | 10
I am not sure how the logstash pipeline works.
Can events overtake each other with the conditions I have in there?
Because not all events are handled by all the filters.
What am I doing wrong?
Thanks!