Get an unique register in a block of traces

Hello I need to get an unique register of a block of traces.

For example I have the next block of traces:
2019-01-15 22:00:24,2534181078938 ERROR - ID_TRAZA_0001 * FAIL_1 * CODE111 * Error 1
2019-01-15 22:00:24,2534181078938 ERROR - ID_TRAZA_0001 * FAIL_2 * CODE112 * Error 2
2019-01-15 22:00:24,2534181078938 ERROR - ID_TRAZA_0001 * FAIL_3 * CODE113 * Error 3
2019-01-15 22:00:24,2534181078938 ERROR - ID_TRAZA_0001 * FAIL_4 * CODE114 * Error 4
2019-01-15 22:00:24,2534181078938 ERROR - ID_TRAZA_0001 * FAIL_5 * CODE555 * Error 5

I just need one field of each line. I want to send one document for elasticsearch as this:
2019-01-15 ID_TRAZA_0001 FAIL_4 CODE555

How do I made this?

Do you always get 5?

Thank you for answer so fast.

I just want to send 1 unique register of this file log. It taking the information necessary by line.

For example:
Line 1 I get: 2019-01-15
Line 2 I get: ID_TRAZA_0001
Line 3 I get: FAIL_3
Line 4 I get: CODE114
Line 5 I get: Error 5

I want to send to elasticsearch an unique document with a mix of information
2019-01-15 ID_TRAZA_0001 FAIL_3 CODE114 Error 5

is it possible made this?

Ok.
We need a fail safe "start" signature and a failsafe "end" signature.
I'm thinking this is FAIL_1 and FAIL_5 with correlation on ID_TRAZA_0001.

I think the aggregate filter can do this.

Ok I add 2 lines start and finish

You imagine a log file as this:

....
ID_TRAZA_0001 * FAIL_1 * CODE111 * START LOG
ID_TRAZA_0001 * FAIL_2 * CODE112 * Error 2
ID_TRAZA_0001 * FAIL_3 * CODE113 * Error 3
ID_TRAZA_0001 * FAIL_4 * CODE114 * Error 4
ID_TRAZA_0001 * FAIL_5 * CODE555 * FIN LOG
ID_TRAZA_0002 * FAIL_1 * CODE111 * START LOG
ID_TRAZA_0002 * FAIL_2 * CODE112 * Error 2
ID_TRAZA_0002 * FAIL_3 * CODE113 * Error 3
ID_TRAZA_0002 * FAIL_4 * CODE114 * Error 4
ID_TRAZA_0002 * FAIL_5 * CODE555 * FIN LOG
.....

I want to get 2 documents for send to elasticsearch

Document 1:
2019-01-15 ID_TRAZA_0001 FAIL_4 CODE555

Document 2:
2019-01-15 ID_TRAZA_0001 FAIL_9 CODE777

Do you think , it is possible do with the filter aggregate?

Yes.

I am working up a config now. I am not very familiar with the aggregate filter so I need the practice.

Do you have control on the format of START LOG and FIN LOG?

It would be better if those two lines had an identifier in them.

How about this:

input {
  generator {
    lines => [
      '2019-01-15 22:00:24,001 ERROR - ID_TRAZA_0001 * FAIL_1 * CODE111 * Error 1',
      '2019-01-15 22:00:24,002 ERROR - ID_TRAZA_0001 * FAIL_2 * CODE112 * Error 2',
      '2019-01-15 22:00:24,003 ERROR - ID_TRAZA_0001 * FAIL_3 * CODE113 * Error 3',
      '2019-01-15 22:00:24,004 ERROR - ID_TRAZA_0001 * FAIL_4 * CODE114 * Error 4',
      '2019-01-15 22:00:24,005 ERROR - ID_TRAZA_0001 * FAIL_5 * CODE555 * Error 5'
    ]
    count => 1
  }
}

filter {
  dissect {
    mapping => {
      message => '%{date} %{+date} %{level} - %{identity} * %{failure} * %{code} * %{error_number}'
    }
  }

  if [failure] == "FAIL_1" {
    aggregate {
      task_id => "%{identity}"
      # stash the info from event 1 and cancel it
      code => "map['date'] = event.get('date'); event.cancel"
      map_action => "create"
    }
  }
  if [failure] == "FAIL_2" {
    aggregate {
      task_id => "%{identity}"
      # stash the info from event 2 and cancel it
      code => "map['identity'] = event.get('identity'); event.cancel"
      map_action => "update"
    }
  }
  if [failure] == "FAIL_3" {
    aggregate {
      task_id => "%{identity}"
      # stash the info from event 3 and cancel it
      code => "map['failure'] = event.get('failure'); event.cancel"
      map_action => "update"
    }
  }
  if [failure] == "FAIL_4" {
    aggregate {
      task_id => "%{identity}"
      # stash the info from event 4 and cancel it
      code => "map['code'] = event.get('code'); event.cancel"
      map_action => "update"
    }
  }
  if [failure] == "FAIL_5" {
    aggregate {
      task_id => "%{identity}"
      # update event 5 with stashed info from previous events
      code => "event.set('date', map['date']); event.set('identity', map['identity']); event.set('failure', map['failure']); event.set('code', map['code'])"
      end_of_task => true
      timeout => 120
    }
  }
}

output {
  stdout { codec => rubydebug }
}

GIves:

{
            "code" => "CODE114",
            "date" => "2019-01-15 22:00:24,001",
        "sequence" => 0,
      "@timestamp" => 2019-01-21T14:04:17.523Z,
    "error_number" => "Error 5",
        "@version" => "1",
            "host" => "Elastics-MacBook-Pro.local",
         "failure" => "FAIL_3",
           "level" => "ERROR",
         "message" => "2019-01-15 22:00:24,005 ERROR - ID_TRAZA_0001 * FAIL_5 * CODE555 * Error 5",
        "identity" => "ID_TRAZA_0001"
}

I get the impression that this is not your actual data but I think you can see what you need to adjust.

IMPORTANT:
set pipeline.workers: 1 in config/logstash.yml

I go to try this.

Thank you

It is working

Thank you for you example with aggregate

Glad to help.

Bear in mind that your performance might suffer with 1 worker thread but I don't know of a better solution.

Hello again

I need to add in the aggregate filter 2 or more fields.

For example I have tried this:

		grok {		
				match => { "text" => "<.*TxId>%{DATA:id_op}<.*TxId>.*"}	
		}
		aggregate {
			task_id => "%{idTrace}"
			code => "map['id_op'] = event.get('id_op');event.cancel;"
			map_action => "create"					
		}
  	
  	aggregate {
			task_id => "%{idTrace}"
			code => "map['some'] = "something" ;event.cancel;"
			map_action => "update"					
		}

But it doesn't work.

I have tried this also:

		grok {		
				match => { "text" => "<.*TxId>%{DATA:id_op}<.*TxId>.*"}	
		}
		aggregate {
			task_id => "%{idTrace}"
			code => "map['id_op'] = event.get('id_op');event.cancel;map['some'] = "something";event.cancel;"
			map_action => "create"					
		}

It doesn't work neither

I have a doubt more, how can I subtract 2 values?

Please, have you got any idea?

thank you very much

Post more of ¥our newer config to answer questions like...
Where is the field idTrace coming from?

You only need to cancel the event once.
If you do not put some aggregate filter sections inside a conditional AND you cancel the event, you end up cancelling all events and will not get any output.

As I suspected, your original post was a made-up scenario describing only some aspects of your real problem - this is fine, but we can't read minds so either explain fully the whole problem or read the aggregate filter docs (covers a few scenarios including adding and subtracting).

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.