Logstash events out of order

Hello,

I am monitoring a Java application with the elastic agent and create some custom transactions using the Java apm client.
I configured the apm server to write all data into a file, so it can be collected later.
I wrote a logstash pipeline configuration that reads the file, uses some filters to aggregate the data, and then outputs it to elasticsearch so we can analyze the data with kibana.

The system we want to monitor has different states and we create events for state changes. In Logstash we want to aggregate the state information to the other events, so we can easily filter the events by the state that they happened in in Kibana.
Moreover, we want to add the time between 2 events of certain types.
We use the aggregate filter for this.

The problem is that the events from the file to not seem to be processed in order, which makes the data aggregation faulty.
As far as I found online so far is that we need to configure Logstash to only run with one worker which I did in the "logstash.yml" file, by setting pipeline.workers: 1.
I also called logstash with the parameter "-w 1" to be sure only one worker should be used.
However, this does not solve the issue.

Input File looks something like this:

{"@timestamp":"2020-04-01T14:00:08.556Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":20},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"0f79c4d3f204c35a","type":"lang","sampled":true}}
{"@timestamp":"2020-04-01T14:00:08.558Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":79},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"a35c3da608d49839","type":"user","sampled":true}}
{"@timestamp":"2020-04-01T14:00:08.560Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":20},"name":"unnamed","id":"7918695d672c9b26","span_count":{"dropped":0,"started":0},"type":"state","sampled":true}}
{"@timestamp":"2020-04-01T14:00:26.036Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":23},"name":"unnamed","id":"37c8a14d002e9bae","span_count":{"dropped":0,"started":0},"type":"state","sampled":true}}
{"@timestamp":"2020-04-01T14:00:27.023Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":39},"name":"unnamed","id":"45033b85b17926ce","span_count":{"dropped":0,"started":0},"type":"nav","sampled":true}}
{"@timestamp":"2020-04-01T14:00:56.229Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":25},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"144903716325a3af","type":"nav","sampled":true}}
{"@timestamp":"2020-04-01T14:01:01.645Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":14},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"31b008b95200b597","type":"state","sampled":true}}
{"@timestamp":"2020-04-01T14:01:04.762Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":29},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"94557c57d36ed289","type":"nav","sampled":true}}
{"@timestamp":"2020-04-01T14:01:06.231Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":22},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"f9a6e8949b41134a","type":"state","sampled":true}}
{"@timestamp":"2020-04-01T14:01:09.824Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":27},"name":"unnamed","span_count":{"dropped":0,"started":0},"id":"8f6983f8946beeb5","type":"nav","sampled":true}}
{"@timestamp":"2020-04-01T14:01:10.871Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":420},"name":"unnamed","id":"ef0bb6c63c9842dc","span_count":{"dropped":0,"started":0},"type":"nav","sampled":true}}
{"@timestamp":"2020-04-01T14:01:15.763Z", ... ,"processor":"{"name":"transaction","event":"transaction"}", ... ,"transaction":"{"duration":{"us":52},"name":"unnamed","id":"9366e6c813135893","span_count":{"dropped":0,"started":0},"type":"nav","sampled":true}}

The Logstash config looks like this:

    input {
      file {
        path => "C:/apm-server/*"
      }
    }

filter{
  #parse event json data, remove message after parsing
  json { 
	source => "message" 
	remove_field => [ "message" ]
  }

  #filter out other apm events
  if [processor][event] == "transaction" {
 
	  if [transaction][type] == "state" {
		mutate { 
		  add_tag => ["stateChange"] 
		  add_field => {
			"state" => "%{[labels][state]}" 
		  }
		  remove_field => [ "labels" ]
		}
	  }
	  
	  #add observer_id field, from observer.id so it can be used as an identifier in aggregate filters
	  mutate { add_field => {
		  "observer_id" => "%{[observer][id]}" 
		}
	  }
	  
         if [transaction][type] == "state" {
		aggregate {
			task_id => "%{observer_id}"
			code => "
				map['state'] = event.get('state')
				"
		}
	  } else if [transaction][type] == "nav"{
		aggregate {
			task_id => "%{observer_id}"
			code => "
				event.set('state', map['state'])
				if map['lastTime'] != nil
					event.set('duration', event.get('@timestamp') - map['lastTime'])
				end	
				map['lastTime'] = event.get('@timestamp')
				"
		}
	  }
	    
	  #filter to count events, for debuggung
	  aggregate {
			task_id => "%{observer_id}"
			code => "
				if map['count'] == nil
					map['count'] = 0
				end	
				event.set('count', map['count'])
				map['count'] = map['count'] + 1
				"
		}
      }
}

output {
    elasticsearch {
        hosts => ["http://localhost:9200"]
        user => "elastic"
        password => "changeme"
    }
    stdout { codec => json }
}

I added the final aggregate filter to add a counter that shows me in what order the events are processed, and the count is not in the same order as the events are in the file.
Which also means that the "nav" events get the wrong 'state' assigned to them and also the time difference in 'duration' is sometimes wrong.

In kibana it then looks something like this:

transaction.type | state | duration | count
lang             |   -   |    -     |   0
user             |   -   |    -     |   1 
state            |   A   |    -     |   2
state            |   B   |    -     |   3
nav              |   B   |  -48.74  |   11
nav              |   B   |    -     |   6
state            |   C   |    -     |   4
nav              |   B   |  8.533   |   7
state		 |   B   |    -     |   5
nav              |   B   |  5.062   |   8
nav              |   B   |  1.047   |   9
nav              |   B   |  4.892   |   10

I am not sure how the logstash pipeline works.
Can events overtake each other with the conditions I have in there?
Because not all events are handled by all the filters.

What am I doing wrong?

Thanks!

Try disabling java_execution.

1 Like

Thank you!

This seems to make it better.
Now the events are being processed in the order the lines are in the file.
However, it seems that the APM server does also not wirte the events in order into the file.
I implementen my own little programm that sorts the lines by timestamp, before handing the file over to Logstash, then it seems to work.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.