Aggregate filter - Parsing Jenkins log

Dear ELK community,

I'm quite new to ELK stack and I'm trying to figure out what can I really achieve with Logstash.
I'm trying to extract some information from Jenkins logs, and I found the aggregate filter plugin which is pretty powerful.

My Jenkins logs looks like this :

[Timestamp] Started by user foo
[Timestamp] Running on nodename1
[Timestamp] some process logs which will be drop
[Timestamp] [Pipeline] \\ node
[Timestamp] Running on nodename2
[Timestamp] some process logs which will be drop
[Timestamp] [Pipeline] \\ node
[Timestamp] Running on nodename1
[Timestamp] some process logs which will be drop
[Timestamp] [Pipeline] \\ node
[Timestamp] Finished: SUCCESS

With a combination of grok + aggregate plugins I managed to extract some informations and display them in kibana like this :

As you can see above, I created a "nodes" field containing a concatenation of (nodename,timestamp). What I wanted to do was to retrieve the job execution duration on each node. I thought that retrieving (nodename,timestamp), could be good enough to be used in kibana later but as I feared it turns out that I can't really exploit this field in kibana (or at least I'm too inexperienced to do it). My ultimate goal is to compute the execution duration on each node and to sum it up in kibana in order to have the occupation % of my jenkins nodes

Here's the code on how I retrieved my field 'nodes'

filter {
  grok  {
    match => [ "message", "%{TIMESTAMP_ISO8601:currentTime}]%{SPACE}%{NOTSPACE:firstword} %{GREEDYDATA:remaining}" ]
  }

#Extracting node name and node starting time
if [firstword] == "Running" {
  grok {
      another grok here
       }
    aggregate {
         task_id => "%{[log][file][path]}"
         code => "
         map['node'] ||= []
         map['node']  += [event.get('node'),event.get('currentTime')]
         "
         map_action => "create_or_update"
         }
 }

I then, noticed that I could use the line as ending event :

[Timestamp] [Pipeline] \\ node

To pair with each

[Timestamp] Running on xxxxxx

In order to get the duration time of the job on that node.

Here's the output I expect :

-One "profile" line per job as shown in the previous screen (Already got this line)
- nodename | starting_time |finishing_time | duration_of_the job
- nodename | starting_time |finishing_time | duration_of_the job
- nodename | starting_time |finishing_time | duration_of_the job

Ideally I wanted to do this in another logstash.conf file but I didn't find how to do it. So I changed my code supposing that for the same log , I could use different task_id for the aggregate plugin

#Whenever I detect the line starting with "Running"
if [firstword] == "Running" {
  grok {
       match => [ "remaining", "%{NOTSPACE:on} %{GREEDYDATA:node} in %{GREEDYDATA:greedy}/workspace/%{USERNAME:projet}"]
       }
    #This aggregate filter will push in the "main event"
    aggregate {
         task_id => "%{[log][file][path]}"
         code => "
         some data extraction
         "
         map_action => "create_or_update"
         }
#Aggregating for the new sub event 
    aggregate {
         task_id => "%{node}"
         code => "
         map['node'] = event.get('node')
         map['starting_time_on_node'] = event.get('currentTime')
         "
         map_action => "create_or_update"

And Whenever I detect [Pipeline] \node line I push the sub event

   aggregate {
        task_id => "%{node}"
        code => "
        map['finishing_time_on_node'] = event.get('currentTime')
        event.set('starting_time_on_node',map['starting_time_on_node'])
        event.set('finishing_time_on_node',map['finishing_time_on_node']) 
         ## Something like that for the duration
        event.set ('duration, map['finishing_time_on_node'] - ['starting_time_on_node'])
        "
        map_action => "create_or_update"
        end_of_task => true

Unfortunately, the logstash pipeline doesn't seem to build and I'm stuck with this error :
(aggregate filter code):4: unterminated string meets end of file

Furthermore, I don't know which task_id I should use for my 'sub events' because If I pick 'node' I can have the same task_id again therefore it won't aggregate again. Ideally I would like to use a "counter" which increments everytime the pair Running on / [Pipeline] \ node is sent. Can I really use multiple task_id for parsing the same log ?

For one log file that is sent by filebeat, can I apply different logstash.conf to it ? So I can separate my processes into two different files : one processing the whole log , the other one pushing the pairs ( Running on / [Pipeline] \ node)

I'll be glad to hear some of your experiences or have feedback on what I'm doing wrong or If there's another way to answer my needs.

You will need to escape the quotes in the map references.

I think your data model may be broken. I am guessing each log is unique across project + job_name + build. Why not have one document per stage? You would just need to aggregate across

[Timestamp] Running on nodename2
[Timestamp] [Pipeline] \\ node

to get the timestamps, and calculate the duration.

Thank you for your reply. Having one document per stage sounds good, but how can I actually split the whole log in multiple documents of :

[Timestamp] Running on nodename
[Timestamp] [Pipeline] \\ node

In the aggregate filter create an array with an entry for each stage and then use a split filter.

I realize you may be unable to post unredacted logs but it really would be much easier to help if you could.

Thanks for your answer Badger, I can provide this dummy log :

2021-08-20 10:03:40  Started by user foo
2021-08-20 10:03:40  Replayed #68
2021-08-20 10:03:41  [Pipeline] Start of Pipeline
2021-08-20 10:03:41  [Pipeline] node
2021-08-20 10:03:41  Running on node1 in /srv/jenkins/workspace
2021-08-20 10:03:41  [Pipeline] {
2021-08-20 10:03:42  [Pipeline] cleanWs
2021-08-20 10:03:42  [WS-CLEANUP] Deleting project workspace...
2021-08-20 10:03:42  [WS-CLEANUP] Deferred wipeout is used...
2021-08-20 10:03:42  [WS-CLEANUP] done
2021-08-20 10:03:46  [Pipeline] // stage
2021-08-20 10:03:46  [Pipeline] stage
2021-08-20 10:03:46  [Pipeline] { (Maven)
2021-08-20 10:03:46  Stage "Maven" skipped due to earlier failure(s)
2021-08-20 10:03:46  [Pipeline] }
2021-08-20 10:03:46  [Pipeline] // stage
2021-08-20 10:03:46  [Pipeline] stage
2021-08-20 10:03:46  [Pipeline] { (Pypi)
2021-08-20 10:03:46  Stage "Pypi" skipped due to earlier failure(s)
2021-08-20 10:03:46  I failed :( 
2021-08-20 10:03:46  [Pipeline] mail
2021-08-20 10:03:46  [Pipeline] script
2021-08-20 10:03:46  [Pipeline] {
2021-08-20 10:03:46  [Pipeline] sh
2021-08-20 10:03:46  [Pipeline] cleanWs
2021-08-20 10:03:46  [WS-CLEANUP] Deleting project workspace...
2021-08-20 10:03:46  [WS-CLEANUP] Deferred wipeout is used...
2021-08-20 10:03:46  [WS-CLEANUP] done
2021-08-20 10:03:46  [Pipeline] }
2021-08-20 10:03:46  [Pipeline] // script
2021-08-20 10:03:46  [Pipeline] }
2021-08-20 10:03:46  [Pipeline] // stage
2021-08-20 10:03:46  [Pipeline] }
2021-08-20 10:03:46  [Pipeline] // node
2021-08-20 10:03:46  [Pipeline] node
2021-08-20 10:03:47  Running on node2 in /srv/jenkins/workspace
2021-08-20 10:03:47  [Pipeline] echo
2021-08-20 10:03:47  I failed :( 
2021-08-20 10:03:47  [Pipeline] }
2021-08-20 10:03:47  [Pipeline] // node
2021-08-20 10:03:47  [Pipeline] End of Pipeline
2021-08-20 10:03:47  Finished: FAILURE

You are right, the logs can be very different everytime, but the lines starting by 'Running on xxxx' and [Pipeline] \ node will always be there to mark the start and the end of a job execution on a node therefore I still believe I can retrieve those pair of information.

As you suggested I'm very interested in trying to create an array for each stage. And then use the split filter.
So here I'm only interested in every line beginning by "Running on" (which means the start of an execution on my node) and every line which is equals to ' [Pipeline] // node (which is the end)

I tried this aggregate filter which will put the nodename,the starting timestamp, and the ending timestamp in an array.

#The line start by "Running on "
if [firstword] == "Running" {
  grok {
       match => [ " some grok "]
       }
    aggregate {
         task_id => "%{[log][file][path]}"
         code => "
         map['node'] ||=[]
         map['node']['starting_time_on_node'] ||=[]
         map['node'] << event.get('node_name')
         map['node']['starting_time_on_node'] << event.get('currentTime')
         "
         map_action => "create_or_update"
         }
}

# The line is  [Pipeline] \\ node which means I'm done working on this node
if [firstword] == "[Pipeline]" {
    grok {
       match => [ " some grok "]
       }
   aggregate {
        task_id => "%{[log][file][path]}"
        code => "
        map['node']['starting_time_on_node']['finishing_time_on_node'] ||=[]
        map['node']['starting_time_on_node']['finishing_time_on_node'] << event.get('currentTime)
   "
   map_action => "create_or_update"
   }

And then I tried to push the events like this :

aggregate {
  task_id => "%{[log][file][path]}"
  code => "
  event.set('nodes', map['node'])
  "
  map_action => "create_or_update"
  end_of_task => true

}
 

I was expecting to see in elastic my first array but unfortunately when I push logs on logstash I got this error :

Aggregate exception occurred {:error=>#<TypeError: no implicit conversion of String into Integer>

I tried to read other posts on creating arrays with aggregate filter and I don't understand the error I get. Your help is really appreciated,

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.