Exclude a file based on a value inside this file


(bus) #1

Hey,

I listen several files in a directory thanks to Filebeat and then I parse those who interest me. For exemple :

This file doesn't interest me because its "CODE RETOUR" value is 0 (last line) :

INFO;0000;000002;*                                                LOG D'EXECUTION                                                 *
INFO;0000;000003;*                        /data/EDT/batchs/files/logs/WKF998MT-20180618-20180618-1302.log                         *
INFO;0000;000004;*                                            2018-06-18 13:03:09:420                                             *
INFO;0000;000005;*                                       WKF998MT - calcul des statistiques                                       *
INFO;0000;000008;   le champ "FETCH_SIZE_VALUE" n'existe pas dans le fichier de propriétes du batch. utilisati
INFO;0000;000009;   on de la valeur par défaut.
INFO;0000;000010;
INFO;0000;000011;   Version de l'application : 13.13.200 (build : 149df21, date : 01-06-2018 17:02:30)
INFO;0000;000012;      Version de l'architecture : 4.143.500 (build : 879ab1c, date : 30-04-2018 09:42:03)
INFO;0000;000013;      Version du framework : 4.143.500 (build : 879ab1c, date : 30-04-2018 09:42:03)
INFO;0000;000014;      Version EDK : 4.131.500 (build : 1561e01, date : 02-05-2018 14:58:47)
INFO;0000;000015;      Version ecore : 4.140.500 (build : 3eef259, date : 03-05-2018 15:49:45)
INFO;0000;000016;   Utilisateur Oracle : HERMES_USER
INFO;0000;000017;   info BDD : 13.13.200 / UEM / METZ
INFO;0000;000574;      Calcul des statistiques de campagnes : 
INFO;0000;000575;      Exécution OK
INFO;0000;000576;      Temps d'exécution: 0h 0' 0" 15ms ms
INFO;0000;000580;   TEMPS D'EXECUTION : 0h 0' 15" 255ms
INFO;0000;000582;CODE RETOUR : 0

This file interest me because its "CODE RETOUR" value is different of 0 :

INFO;0000;000003;*                        /data/EDT/batchs/files/logs/MNS014MT-20180612-20180613-0105.log                         *
INFO;0000;000005;*                    MNS014MT - Prélèvement et validation d'échéancier suite à saisie de RIB                     *
INFO;0000;000019;   info BDD : 13.13.100 / UEM / METZ
MNOR;3011;000036;   Erreur de traitement d'un élément
MNOR;3012;000037;      Erreur lors de la mise en transaction
MNOR;4009;000038;         aucune ligne de compte à mettre en transaction (compte:625316)
INFO;0000;000096;CODE RETOUR : -4

But how to drop file which contains "CODE RETOUR :0" ?

I'm currently able to parse my files and to drop lines which not interest me but i'm still unable to handle with those that contain "CODE RETOUR: 0" value.

I need to find a way to read the file, save the "CODE RETOUR" value in a variable and then add this new variable to every lines and then with filter drop all lines which CODE_RETOUR == "0"

..

My pipeline :

input
{
  beats
  {
    port => 5044
  }
}
filter
{
  grok
  {
    match => { "message" => [ "%{WORD:TYPE};%{DATA:ID1};%{NUMBER:ID2};%{GREEDYDATA:DESCRIPTION}" ] }
  }
  if ([DESCRIPTION] =~ "CODE")
  {
    grok
    {
      match => { "DESCRIPTION" => [ "%{NUMBER:CODE_RETOUR}" ] }
    }
  }
  if ([ID2] == "000003")
  {
    grok
    {
      match => { "DESCRIPTION" => [ "%{WORD:NOM_BATCH}-%{BASE16NUM:DATE_BATCH}" ] }
    }
    ruby { code => "@@save_the_date = event.get('DATE_BATCH')" }
	ruby { code => "@@save_the_name = event.get('NOM_BATCH')" }
  }
  else
  {
    ruby { code => "event.set('DATE_BATCH', @@save_the_date)" }
	ruby { code => "event.set('NOM_BATCH', @@save_the_name)" }
  }
  if ([TYPE] == "INFO")
  {
    if ([ID2] != "000003" and [ID2] != "000005")
    {
      if ([DESCRIPTION] !~ "info BDD" and [DESCRIPTION] !~ "CODE RETOUR")
      {
        drop { }
      }
    }
  }
  if "_grokparsefailure" in [tags] 
  {
    drop { }
  }
  date
  {
    match => [ "DATE_BATCH", "yyyyMMdd" ]
  }
  mutate
  {
    remove_field => [ "@version","ID1","_id","_index","_score","_type","beat.hostname","beat.name","beat.version","filetype","host","offset","prospector.type" ]
    convert => { "CODE_RETOUR" => "integer" }
  }
}
output
{
  elasticsearch
  {
    hosts => "http://localhost:9200"
    index => "essai"
  }
stdout { codec => rubydebug }  
}

I don't know if something like that is possible to inplement on filebeat conf file :

processors:
 - drop_file:
     when:
        regexp:
           message: "^CODE RETOUR : 0:"

Thx for all


(bus) #2

Can somebody help me ? @Badger an idea ? :slight_smile:


#3

It looks like you want to read the whole of a file as a single event. It is something that filebeat and logstash do not support well.

In logstash, with a file input, you can use a multiline codec to do it. Use a pattern that does not occur in the file as well as auto_flush_interval

codec => multiline { pattern => "Spalanzani" negate => true what => "previous" auto_flush_interval => 2 }

The filebeat equivalent would be

multiline.pattern: Spalanzani
multiline.negate: true
multiline.match: before
multiline.timeout: 2

Of course you then have to start over from scratch with your parsing :slight_smile:


(bus) #4

Ok but in my case it is a filebeat input and not a file input

So I have to listen my current filebeat log directory as file input path ?

For exemple now I have :

input
{
  beats
  {
    port => 5044
    #filebeat.yml prospector path : /etc/myPath/*.vld
  }
}

And I have to do :

input {
  file {
    path => "/etc/myPath/*.vld"
  }
}

That's it ?

Thx u @Badger


#5

No, you can continue using filebeat if you want to. I gave the multiline configuration that you would need to use in my last post.


(bus) #7

Ok, using input plugin I'm now able to send my entire file (BUT NOT WITH FILEBEAT .. ?). I had misread your phrase "Use a pattern that does not occur in the file as well as auto_flush_interval".

So I did :

input {
  file {
    path => "/home/log/GDA/*.log"
    codec => multiline { pattern => "Spalanzani" negate => true what => "previous" auto_flush_interval => 1 max_lines => 4000 }
  }
}

But my file is send two times instead of 1 time. I don't know why.. ?

health status index uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   test 9dEe2jg1SPiiwbEuVsJgVQ   5   1          2            0     87.5kb         87.5kb

My pipeline (My filter gives me possibility to drop false positive files and normal file.) :

input {
  file {
    path => "/home/log/GDA/*.log"
	codec => multiline { pattern => "Spalanzani" negate => true what => "previous" auto_flush_interval => 1 max_lines => 4000 }
  }
}
filter
{
 #DROP NORMAL FILES
 if [message] =~ "CODE RETOUR : 0"
 {
   drop { }
 }
 #DROP FALSE POSITIVE FILES
 if [message] =~ "CODE RETOUR : -4"
 {
   if [message] !~ "MNOR" and [message] !~ "TECH" and [message] !~ "FTAL"
   {
     drop { }
   }
 }
}
output
{
  elasticsearch
  {
    hosts => "http://localhost:9200"
    index => "test"
  }
stdout { codec => rubydebug }  
}

Ps : I can delete this duplicate file with fingerprint method but is it normal that 2 files are send ?

Ps : This method is very good to delete my files but is it still possible to parse this loooong "message" field as i did before ? U can check my first filter plugin in my first post.


.
.
.
until INFO;0000;000761 for this file ^^ very long event :slight_smile:

My grok could be now :

%{WORD:TYPE};%{DATA:ID1};%{NUMBER:ID2};%{GREEDYDATA:DESCRIPTION}
%{WORD:TYPE};%{DATA:ID1};%{NUMBER:ID2};%{GREEDYDATA:DESCRIPTION}
%{WORD:TYPE};%{DATA:ID1};%{NUMBER:ID2};%{GREEDYDATA:DESCRIPTION}
.
.
.

But I think it is not the better way


#8

Getting two copies of the file is not normal. Are you certain it is not 2 different files?

Looking at the filter in the first post, you appear to want data from a couple of lines. I would do that using something like this

      grok { match => [ "message", "^%{WORD:TYPE};%{NUMBER:ID1};%{NUMBER:ID2};%{SPACE}info BDD %{DATA:DESCRIPTION}
" ] }

Or perhaps even this, which looks for a line that has info BDD in it, following by multiple characters that are not newline followed by newline.

        grok { match => [ "message", "^%{WORD:TYPE};%{NUMBER:ID1};%{NUMBER:ID2};(?<restOfLine>[[:space:]]+info BDD[^
]+)
" ] }

(bus) #9

Thx u for answer @badger but I don't understand something :

Now, with this file multiline input, the whole of my file is read as a single event. a big [message] field. Right. But me, now, I would like to parse this big message to keep only 5 lines like I did before.

For exemple, this is my file :

INFO;0000;000001;******************************************************************************************************************
INFO;0000;000002;*                                                LOG D'EXECUTION                                                 *
INFO;0000;000003;*                        /data/EDT/batchs/files/logs/MNS014MT-20180612-20180613-0105.log                         *
INFO;0000;000004;*                                            2018-06-13 01:05:43:448                                             *
INFO;0000;000005;*                    MNS014MT - Prélèvement et validation d'échéancier suite à saisie de RIB                     *
INFO;0000;000006;******************************************************************************************************************
INFO;0000;000015;      Version du framework : 4.143.500 (build : 879ab1c, date : 30-04-2018 09:42:03)
INFO;0000;000016;      Version EDK : 4.131.500 (build : 1561e01, date : 02-05-2018 14:58:47)
INFO;0000;000017;      Version ecore : 4.140.500 (build : 3eef259, date : 03-05-2018 15:49:45)
INFO;0000;000018;   Utilisateur Oracle : HERMES_USER
INFO;0000;000019;   info BDD : 13.13.100 / UEM / METZ
INFO;0000;000032;   Traitement de l'offre 'fixo élec énergem' (id : BT$3WAp)
INFO;0000;000033;   Nombre d'elts traités : 1. Nombre d'elts en erreur : 0
INFO;0000;000034;
INFO;0000;000035;   Traitement de l'offre 'fixo gaz uem' (id : G$3XVK)
MNOR;3011;000036;   Erreur de traitement d'un élément
MNOR;3012;000037;      Erreur lors de la mise en transaction
MNOR;4009;000038;         aucune ligne de compte à mettre en transaction (compte:625316)
INFO;0000;000039;   
INFO;0000;000094;   TEMPS D'EXECUTION : 0h 1' 48" 216ms
INFO;0000;000095;
INFO;0000;000096;CODE RETOUR : -4
INFO;0000;000097;
INFO;0000;000098;******************************************************************************************************************

This entire content of this file is [message] field in Kibana.

And me I would like to have my file like this :

INFO;0000;000003;*                        /data/EDT/batchs/files/logs/MNS014MT-20180612-20180613-0105.log                         *
INFO;0000;000005;*                    MNS014MT - Prélèvement et validation d'échéancier suite à saisie de RIB                     *
INFO;0000;000019;   info BDD : 13.13.100 / UEM / METZ
MNOR;3011;000036;   Erreur de traitement d'un élément
MNOR;3012;000037;      Erreur lors de la mise en transaction
MNOR;4009;000038;         aucune ligne de compte à mettre en transaction (compte:625316)
INFO;0000;000096;CODE RETOUR : -4

Only 4 INFO lines interest me (line 3, line 5, line "info BDD", line CODE RETOUR") and all lines different of "INFO".

That's why I don't know how to have only these line into my big new [message] field, or create 5 events which represents each lines like I did before in filebeat input.

Do you understand what I mean ?

Thx for all @Badger


#10

I think being able drop files with CODE RETOUR zero is a good enough reason to keep the multiline filter. However, for the rest of the processing you will want to split those lines up. The final filtering we can do in ruby, which always feels like it is the wrong solution, but it does basically work.

    mutate {
        split => { "message" => "
" }
    }
    ruby {
        code => "
            interestingLines = []
            event.get('message').each { |x|
                unless /^INFO/.match(x)
                    interestingLines << x
                end
                if /^INFO;0000;00000(3|5)/.match(x)
                    interestingLines << x
                end
                if /CODE RETOUR|info BDD/.match(x)
                    interestingLines << x
                end
            }
            event.set('interestingLines', interestingLines)
        "
    }

will result in

"interestingLines" => [
    [0] "INFO;0000;000003;*                        /data/EDT/batchs/files/logs/MNS014MT-20180612-20180613-0105.log                         *",
    [1] "INFO;0000;000005;*                    MNS014MT - Prévement et validation d'éécier suite à aisie de RIB                     *",
    [2] "INFO;0000;000019;   info BDD : 13.13.100 / UEM / METZ",
    [3] "MNOR;3011;000036;   Erreur de traitement d'un ément",
    [4] "MNOR;3012;000037;      Erreur lors de la mise en transaction",
    [5] "MNOR;4009;000038;         aucune ligne de compte à ettre en transaction (compte:625316)",
    [6] "INFO;0000;000096;CODE RETOUR : -4"
],

At that point, if you really want separate events you can blow the array apart using a split filter (not the split function in mutate).

If you want one big message field you can merge the members of the array using the join function of the mutate filter.


(system) #11

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.