Remove duplicates in logstash logs


(bus) #1

Hello, I use filebeat to send logs to logstash and then I use Logstash to parse them. For exemple this line :

31/5/2018 01:06:24.073 (TACHE) 30/5/2018/D T UE_TECXXX_J_TN_SSH002_DELCLONE_LPSACAS1(14033)/UE_TECXXX_X_LL_OPE003_TECHNIQUE TER STATUS : TN Terminaison normale de la tâche (TN EXIT CODE 0)

Become after groke parse :

{
        "DATE_TACHE" => "30/5/2018"
       "HEURE_TACHE" => "01:06:24.073"
         "NOM_TACHE" => "UE_TECXXX_J_TN_SSH002_DELCLONE_LPSACAS1"
"LOCALISATION_TACHE" => "UE_TECXXX_X_LL_OPE003_TECHNIQUE"
 "CODE ERREUR TACHE" => "0"
}

And sometime, the same task (tache in french) is done several times, but not at the same moment. That's why I would like to know if I had possibility to keep the first event of the couple NOM_TACHE/LOCALISATION_TACHE and drop every other same identic couple at this one

For exemple, after the first exemple above, this task should be dropped :

{
        "DATE_TACHE" => "30/5/2018"
       "HEURE_TACHE" => "07:16:24.143"
         "NOM_TACHE" => "UE_TECXXX_J_TN_SSH002_DELCLONE_LPSACAS1"
"LOCALISATION_TACHE" => "UE_TECXXX_X_LL_OPE003_TECHNIQUE"
 "CODE ERREUR TACHE" => "252"
}

I'm completly stuck on this problem...

Can somebody help me ?

Thx for all.

Here my pipeline :

### INPUT SECTION ###
input
{
  beats
  {
    port => 5044
  }
}



### FILTER SECTION ###
filter
{
  grok
  {
    match => { "message" => [ "%{DATE_EU:DATE_LOG} %{TIME:HEURE_TACHE} \(%{WORD:TYPE_TACHE}\) %{DATE_EU:DATE_TACHE}/D . %{WORD:NOM_TACHE}\(%{NUMBER:ID_TACHE}\)/%{WORD:LOCALISATION} (?<STATUS>[A-Z]\w++\s+[A-Z]\w+) : %{WORD:CODE_TACHE} %{GREEDYDATA:DESCRIPTION}" ] }
  }
  if ([message] =~ "CODE")
  {
    grok
    {
      match => { "DESCRIPTION" => [ "%{NUMBER:CODE_ERREUR_TACHE}" ] }
    }
  }
  mutate
  {
    remove_field => [ "@version","CODE_TACHE","DATE_LOG","ID_TACHE","STATUS","TYPE_TACHE","_id","_index","_score","_type","beat.hostname","beat.name","beat.version","filetype","host","offset","prospector.type","tags" ]
    convert => { "CODE_ERREUR_TACHE" => "integer" }
  }
  if [message] !~ "réalisées" and [message] !~ "CODE"
  {
    drop { }
  }
  if [message] =~ "groupe"
  {
    drop { }
  }
  date
  {
    match => [ "DATE_TACHE", "dd/MM/yyyy" ]
  }
}

### OUTPUT SECTION ###
output
{
  elasticsearch
  {
    hosts => "http://localhost:9200"
    index => "vegaaprodd"
  }
stdout { codec => rubydebug }
}

#2

If you are OK with saving the last instead of the first then you can use the fingerprint filter to generate an id based on your choice of fields, then set the document_id to that id in the elasticsearch output. This will cause the documents to be overwritten when a new event with the same fields arrives.


(bus) #3

Hey @Badger,

Thx for your reply, i'll try it fastly and come back to you if i've got questions ! :slight_smile:


(bus) #4

@Badger

I tried to do what you said, but it doesn't work.

I modified my pipeline and add "fingerprint" section into "filter" section which looks like : (I think config is correct, I want to hash with DATE,NOM & LOCALISATION each event)

fingerprint
  {
    source => "%{[DATE_TACHE][NOM_TACHE][LOCALISATION_TACHE]}"
    target => "fingerprint_id"
    concatenate_sources => "true"
    method => "MURMUR3"
  }

And add a "document_id" field into output section

output
{
  elasticsearch
  {
    hosts => "http://localhost:9200"
    index => "testdoublon"
    document_id => "%{[fingerprint_id]}"
  }

I tried with 8 logs lines to test (On these 8, 5 are duplicates and 3 are unique. So I should have 4 events into elasticsearch... But I only have 1 "docs.count" and 7 "docs.deleted" :

health status index       uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   testdoublon 87WQvbi1R1ml6ih1nM_QMA   5   1          1            7     18.2kb         18.2kb

I don't know why... Theses 8 lines are (better with copy/paste and notepad++ :slight_smile: )

31/05/2018 07:26:52.720 (TACHE) 31/05/2018/D T UE_VGAXXX_J_TN_VGA002_SAV_LOGBDD_VEGA(26)/PR_LWXXX_TECVGAXXX TER STATUS : TN Terminaison normale de la tâche (TN EXIT CODE 0)
31/05/2018 07:26:52.766 (TACHE) 31/05/2018/D T OR_WKFXXX_J_TN_WKF998MT_CALC_STAT_JOUR(15805)/OR_XXXXXX_X_LL_POC001_PROTOPROD TER STATUS : TN Terminaison normale de la tâche (TN EXIT CODE 0)
31/05/2018 07:28:19.013 (TACHE) 31/05/2018/D T XX_TECXXX_J_TN_TST001_TEST_AGENT(13423)/MA_NETXXX_X_LL_OPE001_EFLUIDNET TER STATUS : TN Terminaison normale de la tâche (TN EXIT CODE 0)
31/05/2018 07:30:35.950 (TACHE) 31/05/2018/D T UE_TECXXX_J_TN_MOV001_PREVENERCOM_2_ARCH(11968)/PR_LWXXX_TECVGAXXX TER STATUS : TN Terminaison normale de la tâche (TN EXIT CODE 0)
31/05/2018 07:41:18.663 (TACHE) 31/05/2018/D T UE_TECXXX_J_TN_MOV001_PREVENERCOM_2_ARCH(11968)/PR_LWXXX_TECVGAXXX TER STATUS : TN Terminaison normale de la tâche (TN EXIT CODE 0)
31/05/2018 07:51:18.663 (TACHE) 31/05/2018/D T UE_TECXXX_J_TN_MOV001_PREVENERCOM_2_ARCH(11968)/PR_LWXXX_TECVGAXXX TER STATUS : TN Terminaison normale de la tâche (TN EXIT CODE 0)
31/05/2018 08:41:18.663 (TACHE) 31/05/2018/D T UE_TECXXX_J_TN_MOV001_PREVENERCOM_2_ARCH(11968)/PR_LWXXX_TECVGAXXX TER STATUS : TN Terminaison normale de la tâche (TN EXIT CODE 0)
31/05/2018 09:12:18.663 (TACHE) 31/05/2018/D T UE_TECXXX_J_TN_MOV001_PREVENERCOM_2_ARCH(11968)/PR_LWXXX_TECVGAXXX TER STATUS : TN Terminaison normale de la tâche (TN EXIT CODE 0)

My new pipeline :

### INPUT SECTION ###
input
{
  beats
  {
    port => 5044
  }
}



### FILTER SECTION ###
filter
{
  grok
  {
    match => { "message" => [ "%{DATE_EU:DATE_LOG} %{TIME:HEURE_TACHE} \(%{WORD:TYPE_TACHE}\) %{DATE_EU:DATE_TACHE}/D . %{WORD:NOM_TACHE}\(%{NUMBER:ID_TACHE}\)/%{WORD:LOCALISATION} (?<STATUS>[A-Z]\w++\s+[A-Z]\w+) : %{WORD:CODE_TACHE} %{GREEDYDATA:DESCRIPTION}" ] }
  }
  if ([message] =~ "CODE")
  {
    grok
    {
      match => { "DESCRIPTION" => [ "%{NUMBER:CODE_ERREUR_TACHE}" ] }
    }
  }
  mutate
  {
    remove_field => [ "@version","CODE_TACHE","DATE_LOG","ID_TACHE","STATUS","TYPE_TACHE","_id","_index","_score","_type","beat.hostname","beat.name","beat.version","filetype","host","offset","prospector.type","tags" ]
    convert => { "CODE_ERREUR_TACHE" => "integer" }
  }
  if [message] !~ "réalisées" and [message] !~ "CODE"
  {
    drop { }
  }
  if [message] =~ "groupe"
  {
    drop { }
  }
  date
  {
    match => [ "DATE_TACHE", "dd/MM/yyyy" ]
  }
  fingerprint
  {
    source => "%{[DATE_TACHE][NOM_TACHE][LOCALISATION_TACHE]}"
    target => "[fingerprint_id]"
    concatenate_sources => "true"
    method => "MURMUR3"
  }
}

### OUTPUT SECTION ###
output
{
  elasticsearch
  {
    hosts => "http://localhost:9200"
    index => "testdoublon"
    document_id => "%{[fingerprint_id]}"
  }
stdout { codec => rubydebug }
}

Thx for all


(system) #5

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.