Keep only a certain number of lines of the same type

Hello ELK community.

I use logstash and filebeat to parse files which are contains into a directory.

For exemple, one of my log file looks like :

INFO;0000;000001;***************************************************************
INFO;0000;000002;*                                                LOG D'EXECUTION                                                 *
INFO;0000;000003;*                        /data/EDT/batchs/files/logs/REG023MT-20180612-20180612-1940.log                         *
INFO;0000;000004;*                                            2018-06-12 19:41:11:117                                             *
INFO;0000;000005;*                                  REG023MT - Générer Décaissement par virement                                  *
INFO;0000;000006;***************************************************************
INFO;0000;000007;
INFO;0000;000016;      Version EDK : 4.131.500 (build : 1561e01, date : 02-05-2018 14:58:47)
INFO;0000;000017;      Version ecore : 4.140.500 (build : 3eef259, date : 03-05-2018 15:49:45)
INFO;0000;000018;   Utilisateur Oracle : HERMES_USER
INFO;0000;000019;   info BDD : 13.13.100 / UEM / METZ
INFO;0000;000020;
INFO;0000;000021;   paramètres d'exécution ajustables:
INFO;0000;000022;      nombre de threads max : 8
INFO;0000;000023;      taille des lots techniques : 10
INFO;0000;000024;      commit actif : true
INFO;0000;000025;      type de transaction : transaction au fil de l'eau
INFO;0000;000026;      utilisation truncate pour vider les tables batch : false
INFO;0000;000027;
INFO;0000;000028;
INFO;0000;000029;   -----------------------------------------------------------------------
INFO;0000;000030;   - Traitement de l'entreprise "energem achat"
INFO;0000;000031;   -----------------------------------------------------------------------
INFO;0000;000032;
INFO;0000;000033;    
INFO;0000;000187;   >>>>>>>>>>> Informations diverses                               
INFO;0000;000188;   Référence                      :BD1206201800070
INFO;0000;000189;   Banque                         :BPALC 31421257010                  
INFO;0000;000190;   Montant                        :-8 466,7           
INFO;0000;000191;   Nombre d'élément(s)            :95             
INFO;0000;000192;   Date de constitution           :12/06/2018
INFO;0000;000193;   
INFO;0000;000194;
TECH;P001;000195;   erreur technique lors de l'exécution du job principal
TECH;T003;000196;      erreur imprévue lors de l'exécution du job (GenererDecaissementPreleverOffreProduitJob_
TECH;T003;000197;      10_39)
TECH;T003;000198;                  Exception levée : java.lang.NullPointerException
TECH;T003;000199;                  Message : 
TECH;T003;000200;                     Trace : 
TECH;T003;000201;                     java.lang.String.compareTo(String.java:1155)
TECH;T003;000202;                     com.hermes.rec.reglement.businessprocess.PerceptionProcess.estClientRattacheAu
TECH;T003;000203;                     PercepteurParCodesEtablissementEtGuichet(PerceptionProcess.java:46)
TECH;T003;000204;                     com.hermes.rec.reglement.businessprocess.PerceptionProcess.estClientRattacheAu
TECH;T003;000205;                     PercepteurParCodesEtablissementEtGuichet(PerceptionProcess.java:40)
TECH;T003;000206;                     com.hermes.rec.reglement.businessprocess.PerceptionProcess.isClientRattachePer
TECH;T003;000207;                     cepteur(PerceptionProcess.java:30)
TECH;T003;000208;                     com.hermes.rec.reglement.businessprocess.GenererDecaissementProcess.creationOp
TECH;T003;000247;                     com.hermes.rec.reglement.businessprocess.PerceptionProcess.isClientRattachePer
TECH;T003;000248;                     cepteur(PerceptionProcess.java:30)
TECH;T003;000249;                     com.hermes.rec.reglement.businessprocess.GenererDecaissementProcess.creationOp
TECH;T003;000250;                     eration(GenererDecaissementProcess.java:104)
TECH;T003;000251;                     com.hermes.rec.reglement.businessprocess.GenererOperationFinanciereProcess.cre
TECH;T003;000252;                     ationOperationFinanciereMultipleParCompte(GenererOperationFinanciereProcess.ja
TECH;T003;000253;                     va:611)
INFO;0000;000445;      traitement de consolidation : 0h 0' 0" 3ms
INFO;0000;000446;
INFO;0000;000447;
INFO;0000;000448;   TEMPS D'EXECUTION : 0h 0' 18" 947ms
INFO;0000;000449;
INFO;0000;000450;CODE RETOUR : -8
INFO;0000;000451;

And then with logstash I parse this file to get only :

  • Line 3 (/data/...)
  • Line 5 (REG023...)
  • Line 19 (info BDD ...)
  • All "TECH" [first column] lines.
  • Line which contains "CODE RETOUR" at the end of the file.

So my question is : Rather than keeping all TECH lines, can I just keep for example the first 5 lines? Because the information that interests me is in these first 5 lines and I would like to drop the rest.

My pipeline looks like :

input
{
  beats
  {
    port => 5044
  }
}
filter
{
  grok
  {
    match => { "message" => [ "%{WORD:TYPE};%{DATA:ID1};%{NUMBER:ID2};%{GREEDYDATA:DESCRIPTION}" ] }
  }
  if ([DESCRIPTION] =~ "CODE")
  {
    grok
    {
      match => { "DESCRIPTION" => [ "%{NUMBER:CODE_RETOUR}" ] }
    }
  }
  if ([ID2] == "000003")
  {
    grok
    {
      match => { "DESCRIPTION" => [ "%{WORD:NOM_BATCH}-%{BASE16NUM:DATE_BATCH}" ] }
    }
    ruby { code => "@@save_the_date = event.get('DATE_BATCH')" }
	ruby { code => "@@save_the_name = event.get('NOM_BATCH')" }
  }
  else
  {
    ruby { code => "event.set('DATE_BATCH', @@save_the_date)" }
	ruby { code => "event.set('NOM_BATCH', @@save_the_name)" }
  }
  if ([TYPE] == "INFO")
  {
    if ([ID2] != "000003" and [ID2] != "000005")
    {
      if ([DESCRIPTION] !~ "info BDD" and [DESCRIPTION] !~ "CODE RETOUR")
      {
        drop { }
      }
    }
  }
  date
  {
    match => [ "DATE_BATCH", "yyyyMMdd" ]
  }
  mutate
  {
    remove_field => [ "@version","ID1","_id","_index","_score","_type","beat.hostname","beat.name","beat.version","filetype","host","offset","prospector.type" ]
    convert => { "CODE_RETOUR" => "integer" }
  }
}
output
{
  elasticsearch
  {
    hosts => "http://localhost:9200"
    index => "dededlastone"
  }
stdout { codec => rubydebug }  
} 

Thx u for answer :smiley:

Can somebody help me please ?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.