Hello ELK community.
I use logstash and filebeat to parse files which are contains into a directory.
For exemple, one of my log file looks like :
INFO;0000;000001;***************************************************************
INFO;0000;000002;* LOG D'EXECUTION *
INFO;0000;000003;* /data/EDT/batchs/files/logs/REG023MT-20180612-20180612-1940.log *
INFO;0000;000004;* 2018-06-12 19:41:11:117 *
INFO;0000;000005;* REG023MT - Générer Décaissement par virement *
INFO;0000;000006;***************************************************************
INFO;0000;000007;
INFO;0000;000016; Version EDK : 4.131.500 (build : 1561e01, date : 02-05-2018 14:58:47)
INFO;0000;000017; Version ecore : 4.140.500 (build : 3eef259, date : 03-05-2018 15:49:45)
INFO;0000;000018; Utilisateur Oracle : HERMES_USER
INFO;0000;000019; info BDD : 13.13.100 / UEM / METZ
INFO;0000;000020;
INFO;0000;000021; paramètres d'exécution ajustables:
INFO;0000;000022; nombre de threads max : 8
INFO;0000;000023; taille des lots techniques : 10
INFO;0000;000024; commit actif : true
INFO;0000;000025; type de transaction : transaction au fil de l'eau
INFO;0000;000026; utilisation truncate pour vider les tables batch : false
INFO;0000;000027;
INFO;0000;000028;
INFO;0000;000029; -----------------------------------------------------------------------
INFO;0000;000030; - Traitement de l'entreprise "energem achat"
INFO;0000;000031; -----------------------------------------------------------------------
INFO;0000;000032;
INFO;0000;000033;
INFO;0000;000187; >>>>>>>>>>> Informations diverses
INFO;0000;000188; Référence :BD1206201800070
INFO;0000;000189; Banque :BPALC 31421257010
INFO;0000;000190; Montant :-8 466,7
INFO;0000;000191; Nombre d'élément(s) :95
INFO;0000;000192; Date de constitution :12/06/2018
INFO;0000;000193;
INFO;0000;000194;
TECH;P001;000195; erreur technique lors de l'exécution du job principal
TECH;T003;000196; erreur imprévue lors de l'exécution du job (GenererDecaissementPreleverOffreProduitJob_
TECH;T003;000197; 10_39)
TECH;T003;000198; Exception levée : java.lang.NullPointerException
TECH;T003;000199; Message :
TECH;T003;000200; Trace :
TECH;T003;000201; java.lang.String.compareTo(String.java:1155)
TECH;T003;000202; com.hermes.rec.reglement.businessprocess.PerceptionProcess.estClientRattacheAu
TECH;T003;000203; PercepteurParCodesEtablissementEtGuichet(PerceptionProcess.java:46)
TECH;T003;000204; com.hermes.rec.reglement.businessprocess.PerceptionProcess.estClientRattacheAu
TECH;T003;000205; PercepteurParCodesEtablissementEtGuichet(PerceptionProcess.java:40)
TECH;T003;000206; com.hermes.rec.reglement.businessprocess.PerceptionProcess.isClientRattachePer
TECH;T003;000207; cepteur(PerceptionProcess.java:30)
TECH;T003;000208; com.hermes.rec.reglement.businessprocess.GenererDecaissementProcess.creationOp
TECH;T003;000247; com.hermes.rec.reglement.businessprocess.PerceptionProcess.isClientRattachePer
TECH;T003;000248; cepteur(PerceptionProcess.java:30)
TECH;T003;000249; com.hermes.rec.reglement.businessprocess.GenererDecaissementProcess.creationOp
TECH;T003;000250; eration(GenererDecaissementProcess.java:104)
TECH;T003;000251; com.hermes.rec.reglement.businessprocess.GenererOperationFinanciereProcess.cre
TECH;T003;000252; ationOperationFinanciereMultipleParCompte(GenererOperationFinanciereProcess.ja
TECH;T003;000253; va:611)
INFO;0000;000445; traitement de consolidation : 0h 0' 0" 3ms
INFO;0000;000446;
INFO;0000;000447;
INFO;0000;000448; TEMPS D'EXECUTION : 0h 0' 18" 947ms
INFO;0000;000449;
INFO;0000;000450;CODE RETOUR : -8
INFO;0000;000451;
And then with logstash I parse this file to get only :
- Line 3 (/data/...)
- Line 5 (REG023...)
- Line 19 (info BDD ...)
- All "TECH" [first column] lines.
- Line which contains "CODE RETOUR" at the end of the file.
So my question is : Rather than keeping all TECH lines, can I just keep for example the first 5 lines? Because the information that interests me is in these first 5 lines and I would like to drop the rest.
My pipeline looks like :
input
{
beats
{
port => 5044
}
}
filter
{
grok
{
match => { "message" => [ "%{WORD:TYPE};%{DATA:ID1};%{NUMBER:ID2};%{GREEDYDATA:DESCRIPTION}" ] }
}
if ([DESCRIPTION] =~ "CODE")
{
grok
{
match => { "DESCRIPTION" => [ "%{NUMBER:CODE_RETOUR}" ] }
}
}
if ([ID2] == "000003")
{
grok
{
match => { "DESCRIPTION" => [ "%{WORD:NOM_BATCH}-%{BASE16NUM:DATE_BATCH}" ] }
}
ruby { code => "@@save_the_date = event.get('DATE_BATCH')" }
ruby { code => "@@save_the_name = event.get('NOM_BATCH')" }
}
else
{
ruby { code => "event.set('DATE_BATCH', @@save_the_date)" }
ruby { code => "event.set('NOM_BATCH', @@save_the_name)" }
}
if ([TYPE] == "INFO")
{
if ([ID2] != "000003" and [ID2] != "000005")
{
if ([DESCRIPTION] !~ "info BDD" and [DESCRIPTION] !~ "CODE RETOUR")
{
drop { }
}
}
}
date
{
match => [ "DATE_BATCH", "yyyyMMdd" ]
}
mutate
{
remove_field => [ "@version","ID1","_id","_index","_score","_type","beat.hostname","beat.name","beat.version","filetype","host","offset","prospector.type" ]
convert => { "CODE_RETOUR" => "integer" }
}
}
output
{
elasticsearch
{
hosts => "http://localhost:9200"
index => "dededlastone"
}
stdout { codec => rubydebug }
}
Thx u for answer