Method to timestamp my logstash events

Hello everybody,

I have this input log :

INFO;0000;000001;******************************************************************************************************************
INFO;0000;000002;*                                                LOG D'EXECUTION                                                 *
INFO;0000;000003;*                        /data/EDT/batchs/files/logs/MNS014MT-20180612-20180613-0105.log                         *
INFO;0000;000004;*                                            2018-06-13 01:05:43:448                                             *
INFO;0000;000005;*                    MNS014MT - Prélèvement et validation d'échéancier suite à saisie de RIB                     *
INFO;0000;000006;******************************************************************************************************************
INFO;0000;000007;
INFO;0000;000008;   les paramètres du batch sont :
INFO;0000;000009;      l'utilisateur (code : USER) : B-GF-UEM
INFO;0000;000010;   le champ "FETCH_SIZE_VALUE" n'existe pas dans le fichier de propriétes du batch. utilisati
INFO;0000;000011;   on de la valeur par défaut.
INFO;0000;000012;
INFO;0000;000013;   Version de l'application : 13.13.100 (build : af9c96c, date : 09-05-2018 17:21:19)
INFO;0000;000014;      Version de l'architecture : 4.143.500 (build : 879ab1c, date : 30-04-2018 09:42:03)
INFO;0000;000015;      Version du framework : 4.143.500 (build : 879ab1c, date : 30-04-2018 09:42:03)
INFO;0000;000016;      Version EDK : 4.131.500 (build : 1561e01, date : 02-05-2018 14:58:47)
INFO;0000;000017;      Version ecore : 4.140.500 (build : 3eef259, date : 03-05-2018 15:49:45)
INFO;0000;000018;   Utilisateur Oracle : HERMES_USER
INFO;0000;000019;   info BDD : 13.13.100 / UEM / METZ
MNOR;3011;000036;   Erreur de traitement d'un élément
MNOR;3012;000037;      Erreur lors de la mise en transaction
MNOR;4009;000038;         aucune ligne de compte à mettre en transaction (compte:625316)
INFO;0000;000095;
INFO;0000;000096;CODE RETOUR : -4
INFO;0000;000097;
INFO;0000;000098;******************************************************************************************************************

I use logstash to parse this log and to keep only usefull information looks like : .

INFO;0000;000003;*                        /data/EDT/batchs/files/logs/MNS014MT-20180612-20180613-0105.log                         *
INFO;0000;000005;*                    MNS014MT - Prélèvement et validation d'échéancier suite à saisie de RIB                     *
INFO;0000;000019;   info BDD : 13.13.100 / UEM / METZ
MNOR;3011;000036;   Erreur de traitement d'un élément
MNOR;3012;000037;      Erreur lors de la mise en transaction
MNOR;4009;000038;         aucune ligne de compte à mettre en transaction (compte:625316)
INFO;0000;000096;CODE RETOUR : -4

So and thanks to my pipeline, I extract the date of the log in "DATE_BATCH" field.

And now I want to add this field at every beginning of output messages but i don't know how to do that.

This is my pipeline :

 input
{
  beats
  {
    port => 5044
  }
}
filter
{
  grok
  {
    match => { "message" => [ "%{WORD:TYPE};%{NUMBER:ID1};%{NUMBER:ID2};%{GREEDYDATA:DESCRIPTION}" ] }
  }

  if ([DESCRIPTION] =~ "CODE")
  {
    grok
    {
      match => { "DESCRIPTION" => [ "%{NUMBER:CODE_RETOUR}" ] }
    }
  }

  if ([ID2] == "000003")
  {
    grok
    {
      match => { "DESCRIPTION" => [ "%{DATA:TEST}/%{WORD:NOM_BATCH}-%{BASE16NUM:DATE_BATCH}-%{GREEDYDATA:reste}" ] }
    }
  }

 ####To drop every useless lines######
  if ([TYPE] == "INFO")
  {
    if ([ID2] != "000003" and [ID2] != "000005")
    {
      if ([DESCRIPTION] !~ "info BDD" and [DESCRIPTION] !~ "CODE RETOUR")
      {
        drop { }
      }
    }
  }
  ##################

  date
  {
    match => [ "DATE_BATCH", "yyyyMMdd" ]
  }
}
output
{
  elasticsearch
  {
    hosts => "http://localhost:9200"
    index => "gestapplication"
  }
}

And want to do that to timestamp all my messages because currently, only the message which contains the "DATE_BATCH" information is correctly timestamp.

For exemple my last log line currently looks like :

TYPE => INFO
ID1 => 0000
ID2 => 000096
DESCRIPTION = > CODE RETOUR : -4
CODE_RETOUR => -4
MESSAGE => INFO;0000;000096;CODE RETOUR : -4

I would like to have :

TYPE => INFO
ID1 => 0000
ID2 => 000096
DESCRIPTION = > CODE RETOUR : -4
CODE_RETOUR => -4
DATE_BATCH => 20180612 (field parse thanks to the first log line)
MESSAGE => %{DATE_BATCH};INFO;0000;000096;CODE RETOUR : -4

Something like that.. I think you understand the idea. Just have the possibility to have the timestamp on each lines

Thx for all

Provided that you use a single pipeline worker thread (--pipeline.workers 1) you could save it in a ruby instance variable. Something like

if [ID2] == "000003" {
    ruby { code => "@save_the_date = event.get('DATE_BATCH')" }
} else {
    ruby { code => "event.set('DATE_BATCH', @save_the_date)" }
}

Hi @Badger

I just change my pipeline, it now looks like :

if ([ID2] == "000003")
  {
    grok
    {
      match => { "DESCRIPTION" => [ "%{DATA:TEST}/%{WORD:NOM_BATCH}-%{BASE16NUM:DATE_BATCH}-%{GREEDYDATA:RESTE}" ] }
    }
    ruby 
    {
      code => "@save_the_date = event.get('DATE_BATCH')" 
    }
  }
  else
  {
    ruby 
    { 
      code => "event.set('DATE_BATCH', @save_the_date)"
    }
  }

But now how to specify that the timestamp is "@save_the_date" ?

Because when I go to kibana and when I select my index name, i've only "@timestamp" and "dont use timestamp" choices for timestamp my events. I see nowhere my new @save_the_date field..

Thx for your answer

PS : I also try to add :

  date
  {
    match => [ "@save_the_date", "yyyyMMdd" ]
  }

but it doesn't work too

I was assuming that you would still be running the date filter against DATE_BATCH for every event. The ruby code just makes sure that DATE_BATCH is always there.

@Badger

I just replaced my date section with yours, but it still doesn't work. The "DATE_BATCH" field have been correctly add to all my events but he is empty for lines which doesn't contains the date..

This is my new pipeline :

input
{
  beats
  {
    port => 5044
  }
}
filter
{
  grok
  {
    match => { "message" => [ "%{WORD:TYPE};%{DATA:ID1};%{NUMBER:ID2};%{GREEDYDATA:DESCRIPTION}" ] }
  }
  if ([DESCRIPTION] =~ "CODE")
  {
    grok
    {
      match => { "DESCRIPTION" => [ "%{NUMBER:CODE_RETOUR}" ] }
    }
  }
  if ([ID2] == "000003")
  {
    grok
    {
      match => { "DESCRIPTION" => [ "%{DATA:TEST}/%{WORD:NOM_BATCH}-%{BASE16NUM:DATE_BATCH}-%{GREEDYDATA:RESTE}" ] }
    }
    ruby
    {
      code => "@save_the_date = event.get('DATE_BATCH')"
    }
  }
  else
  {
    ruby
    {
      code => "event.set('DATE_BATCH', @save_the_date)"
    }
  }
  if ([TYPE] == "INFO")
  {
    if ([ID2] != "000003" and [ID2] != "000005")
    {
      if ([DESCRIPTION] !~ "info BDD" and [DESCRIPTION] !~ "CODE RETOUR")
      {
        drop { }
      }
    }
  }
  date
  {
    match => [ "DATE_BATCH", "yyyyMMdd" ]
  }
  mutate
  {
    remove_field => [ "@version","ID1","TEST","RESTE","_id","_index","_score","_type","beat.hostname","beat.name","beat.version","filetype","host","offset","prospector.type" ]
    convert => { "CODE_RETOUR" => "integer" }
  }
}
output
{
  elasticsearch
  {
    hosts => "http://localhost:9200"
    index => "test"
  }
}

I think the line

code => "event.set('DATE_BATCH', @save_the_date)"

Doesn't fill correctly my DATE_BATCH variable because she is empty :

Kibana View :
image

Do you understand why ?

Thx u

PS : To launch my pipeline I used :
bin/logstash -f /etc/logstash/conf.d/vega_gestapp.conf --pipeline.workers 1

And I added in my logstash.yml file line:
pipeline.workers: 1

Sorry about that, I should have tested it before posting. A ruby instance variable is specific to the filter instance, so you cannot set it in one and use it in the other. You have two options. You can use a class variable instead of an instance variable.

if [ID2] == "000003" {
    ruby { code => "@@save_the_date = event.get('DATE_BATCH')" }
} else {
    ruby { code => "event.set('DATE_BATCH', @@save_the_date)" }
}

Or you can do everything in a single filter instance

ruby {
    code => "
        if event.get('ID2') == '000003'
            @save_the_date = event.get('DATE_BATCH')
        else
            event.set('DATE_BATCH', @save_the_date)
        end
    "
}

Note that this kind of thing can be fragile. It assumes lines are processed in order. That is why you are restricted to a single pipeline worker. But note that with the new execution engine in 6.3.0 that is no longer true even for a single worker -- stuff gets re-ordered.

Thx u @Badger your first solution works very well !

I solve this discuss :smiley:

Hi @Badger

I just realized that this solution works only when my pipeline reads the files slowly :

If I send one file, then I give logstash time to parse this file, when it is finish I send one back etc.., so it goes very well ...

... BUT when I send all the files at once, it mixes brushes with my fields DATE_BATCH and NOM_BATCH and the results are not coherent at all ...

Is it possible to put pause times between each file reading? Or but each files in queue

For exemple now in my filebeat log I have :

|2018-06-22T15:23:55.804+0200|INFO|log/harvester.go:216|Harvester started for file: C:\Users\busy\Desktop\bic\trie-2391460397.log|
|---|---|---|---|
|2018-06-22T15:23:55.805+0200|INFO|log/harvester.go:216|Harvester started for file: C:\Users\busy\Desktop\bic\trie-279990785.log|
|2018-06-22T15:23:55.806+0200|INFO|log/harvester.go:216|Harvester started for file: C:\Users\busy\Desktop\bic\trie-3654726275.log|
|2018-06-22T15:23:55.806+0200|INFO|log/harvester.go:216|Harvester started for file: C:\Users\busy\Desktop\bic\trie-400042090.log|
|2018-06-22T15:23:55.811+0200|INFO|log/harvester.go:216|Harvester started for file: C:\Users\busy\Desktop\bic\trie-1850010684.log|
|2018-06-22T15:23:55.812+0200|INFO|log/harvester.go:216|Harvester started for file: C:\Users\busy\Desktop\bic\trie-1929394145.log|
|2018-06-22T15:23:55.815+0200|INFO|log/harvester.go:216|Harvester started for file: C:\Users\busy\Desktop\bic\trie-384824503.log|
|2018-06-22T15:23:55.818+0200|INFO|log/harvester.go:216|Harvester started for file: C:\Users\busy\Desktop\bic\trie-2854993123.log|
|2018-06-22T15:23:55.819+0200|INFO|log/harvester.go:239|End of file reached: C:\Users\busy\Desktop\bic\trie-279990785.log. Closing because close_eof is enabled.|
|2018-06-22T15:23:55.821+0200|INFO|log/harvester.go:216|Harvester started for file: C:\Users\busy\Desktop\bic\trie-3489823606.log|
|2018-06-22T15:23:55.824+0200|INFO|log/harvester.go:239|End of file reached: C:\Users\busy\Desktop\bic\trie-2854993123.log. Closing because close_eof is enabled.|
|2018-06-22T15:23:55.826+0200|INFO|log/harvester.go:239|End of file reached: C:\Users\busy\Desktop\bic\trie-3489823606.log. Closing because close_eof is enabled.|
|2018-06-22T15:23:55.864+0200|INFO|log/harvester.go:239|End of file reached: C:\Users\busy\Desktop\bic\trie-1929394145.log. Closing because close_eof is enabled.|
|2018-06-22T15:23:55.865+0200|INFO|log/harvester.go:239|End of file reached: C:\Users\busy\Desktop\bic\trie-2391460397.log. Closing because close_eof is enabled.|
|2018-06-22T15:23:55.865+0200|INFO|log/harvester.go:239|End of file reached: C:\Users\busy\Desktop\bic\trie-3654726275.log. Closing because close_eof is enabled.|
|2018-06-22T15:23:55.865+0200|INFO|log/harvester.go:239|End of file reached: C:\Users\busy\Desktop\bic\trie-1850010684.log. Closing because close_eof is enabled.|
|2018-06-22T15:23:55.865+0200|INFO|log/harvester.go:239|End of file reached: C:\Users\busy\Desktop\bic\trie-400042090.log. Closing because close_eof is enabled.|
|2018-06-22T15:23:55.865+0200|INFO|log/harvester.go:239|End of file reached: C:\Users\busy\Desktop\bic\trie-384824503.log. Closing because close_eof is enabled.|

And my I would have :

2018-06-22T15:23:55.804+0200	INFO	log/harvester.go:216	Harvester started for file: C:\Users\busy\Desktop\bic\trie-2391460397.log
2018-06-22T15:23:55.865+0200	INFO	log/harvester.go:239	End of file reached: C:\Users\busy\Desktop\bic\trie-2391460397.log. Closing because close_eof is enabled.
#GO TO ElasticSearch
2018-06-22T15:23:55.805+0200	INFO	log/harvester.go:216	Harvester started for file: C:\Users\busy\Desktop\bic\trie-279990785.log
2018-06-22T15:23:55.819+0200	INFO	log/harvester.go:239	End of file reached: C:\Users\busy\Desktop\bic\trie-279990785.log. Closing because close_eof is enabled.
#GO TO ElasticSearch
...

Steffens on Filebeat section Has solved my problem

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.