Aggregate: end_of_task in a conditional block?


#1

Hello everyone,

I'm trying to aggregate log lines that monitor file access into basically two lines per file.
A quick example will make it clear.

The input looks like this:

op:open user:userx path:pathx
op:open user:userx path:pathx
op:read user:userx path:pathx
op:read user:userx path:pathx
op:read user:userx path:pathx
op:stat user:userx path:pathx
op:modified user:userx path:pathx
op:write user:userx path:pathx
op:write user:userx path:pathx
op:write user:userx path:pathx
op:write user:userx path:pathx
op:close user:userx path:pathx
op:close user:userx path:pathx

and the result should look like this:

op:open user:userx path:pathx
op:close user:userx path:pathx threads:2 reads:3 writes:4 stats:1 modified:1

Multiple threads can work on the file, hence multiple "open" and "close" operations.
The task_id is both the filepath and username, e.g. task_id => "%{path}_%{username}"

Now to the problem: the task is over when the last close operation takes place, so I can't just place "end_of_task" in the "close" block, since it would be incomplete. My idea was to count the "open" ops, then count down the "close" ops and when the counter reaches 0, i.e. it's the last "close" op, set the "end_of_task" variable. How can this be done from within the ruby code block?

For example:

if [op] == "open" {
   aggregate {
             task_id => "%{path}_%{username}"
             code => "
                if !(map['open_num']) 
                        map['open_num'] = 1
                else
                        map['open_num'] += 1
                        #show the first, cancel the rest
                        event.cancel()
                end
             "
     }
}

...

if [op] == "close" {
    aggregate {
         task_id => "%{path}_%{username}"
         code => "
            map['close_num'] ||= 0; map['close_num'] += 1
            if (map['open_num']) and map['open_num'] > 1
                    map['open_num'] -= 1
                    event.cancel()
            else
                    event.set('writes', map['writes'])
                    event.set('reads', map['reads'])
                    event.set('lseeks', map['lseeks'])
                    event.set('threads', map['close_num'])
                    ###### set the end_of_task var here??
            end
         "
         timeout => 600
         map_action => "update"
    }
}

Alternatively, it would probably suffice to delete all the aggregate maps but how can this be done?
I've tried something like map['open_num'].clear but that didn't work.
Ideas?


(Ry Biesemeyer) #2

The order in which Logstash processes events cannot be guaranteed. There are things that can be done to increase the likelihood of strict ordering (such as removing parallelism by setting the pipeline's worker count to 1, which significantly affects performance), but we cannot guarantee that events will be fully processed in the order in which they are received.

You may need to "window" the data, emitting the event only when a file has 0 open references for a certain amount of time, so that overlapping operations that are slightly reordered are less likely to emit a close event before the file is actually fully closed.

In any case, you will need to add mutexes around operations that non-atomically mutate shared state (such as when using Hash#[]=) in order to prevent race conditions.


#3

We are indeed using the pipeline with only 1 worker, since the number of users using that part of the system at the same time is very small. In the future, if parallelism becomes necessary and thus race conditions become a problem, we'll take a look at mutexes for the counters.

Can you elaborate on the "windowing" of the data, especially how to delay the emitting of an event based on some conditional?

Also, is it possible to trigger something similar to the effects of "end_of_task" within the ruby code?


(Rivu Biswas) #4

You may need to "window" the data, emitting the event only when a file has 0 open references for a certain amount of time, so that overlapping operations.


https://9apps.ooo/download/
https://luckypatcher.pro/apk/


(system) #5

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.