Duplicated Log

Hello Guys,

My firewall device is sending duplicated log. I tried filter with logstash.
I need write the output in file.

I test with logger but don´t work, I tried use fingerprint.

Can you have any ideia ?

    input {
      udp {
         port => "6514"
         type => "syslog"
       }
     }
 
 
     filter {
       fingerprint {
         source => "message"
         #target => "[@metadata][fingerprint]" ### fail
         target => "[@message][fingerprint]"
         method => "SHA1"
         key => "key"
         base64encode => true
       }
     }
 
     output {
       file {
         path => "/var/log/logstash/firewall_test.log"
       }
     }

This is the OUTPUT ( I test using logger ):

{"type":"syslog","@version":"1","@timestamp":"2021-04-12T18:31:46.968Z","message":"<5>Apr 12 12:31:46 archsight: Feb 12 12:12:12 device_pc111\u0000","host":"127.0.0.1"}
{"type":"syslog","@version":"1","@timestamp":"2021-04-12T18:31:46.972Z","message":"<5>Apr 12 12:31:46 archsight: Feb 12 12:12:12 device_pc111\u0000","host":"127.0.0.1"}

The output does not contain the fingerprint that your filter would add. That very much suggests that you are not running the configuration you think you are running. What are you setting path.config to? How are you starting logstash?

My path.config is default in pipelines.yml.

path.config: "/etc/logstash/conf.d/*.conf" 

Can I use the another module ? like UDP ?

I need remove the duplicated log and tranfer syslog (just message). I think the best way is use the udp module. I tried but it´s the same problem.

filter {
       fingerprint {
         source => "message"
         #target => "[@metadata][fingerprint]" ### fail
         target => "[@message][fingerprint]"
         method => "SHA1"
         key => "key"
         base64encode => true
       }
     }

output {
  udp {
        host => "192.168.10.10"
        port => "514"
        codec => line { format => "%{message}"}
}

If you have a single file output in a configuration file and two copies of the message get written to it then it is very likely you have two configuration files that contain that output.

It is a very common misunderstanding that if you have multiple configuration files they are run independently, but that is not the case unless you are using pipelines.yml. If path.config matches multiple files they are concatenated, events are read from all of the inputs, run through all the filters, and written to all of the events. If two configuration files have the same file output the event will be written twice.

A common way of getting hit by this is to point path.config to a directory, such as /etc/logstash/conf.d/. It will then gather up all the files (e.g. myConfig.conf, myConfig.conf.bak) and combine them.

By the way, if you have two syslog inputs on the same port one of them should be logging an error saying "address already in use".

I understand, but the problem is the FIREWALL send duplicate log´s.

I tried use logstash to avoid this problem. I don´t use the ELK I use another SIEM. So my ideia is filter in LOgstash and send to my syslog.

I install just Logsthash in this device.

If you were writing to elasticsearch you could use fingerprint to set the document id so that duplicate records would overwrite the original.

The best I can think of in logstash is to maintain an array of recently seen fingerprints. Here it is just 10, but you could increase that.

    fingerprint { source => "message" target => "[@metadata][fingerprint]" method => "SHA256" }
    ruby {
        init => '@prints = []; @prints[9] = nil' # Sets 10 entries to nil
        code => '
            print = event.get("[@metadata][fingerprint]")
            if @prints.include? print
                event.cancel
            else
                @prints.shift       # Drop first entry
                @prints.push print  # Append at end
            end
        '
    }
1 Like

Thanks a great idea.

I tried but don´t work. The file was write.

I think put variable to resolve this. So the Output is write just when the fingerprint is validated..... But I don´t know how to put variable ... I tried ...

Check => TRUE
add_field => { "CHECK" => "TRUE"}

Do you have any suggestion ?

filter{
        fingerprint {
                source => "message"
                target => "[@metadata][fingerprint]"
                method => "SHA256"
                }
        ruby {
                init => '@prints = []; @prints[9] = nil' # Sets 10 entries to nil
                code => '
                print = event.get("[@metadata][fingerprint]")
                if @prints.include? print
                        event.cancel
                        add_field => { "CHECK" => "TRUE"}
                else
                        @prints.shift       # Drop first entry
                        @prints.push print  # Append at end
                        add_field => {"CHECK" => "FALSE"}
                end
                '
        }
}


output {
        if [CHECK] == "FALSE" {
        file {
                path => "/var/log/logstash/firewall_test.log"
                codec => line { format => "%{message}"}
        }
}

You cannot use add_field there, you would have to

event.set("CHECK", "TRUE")

but event.cancel has been called at that point, so the event should not be processed any further down the pipeline.

I forgot to mention that you need to set --pipeline.workers 1

1 Like

Thanks man,

Work in my test.
I will received about 4 thousand events/second.
Do you recommend another changer ? or just increase this Array ?

The array needs to be big enough that the duplicate message arrives before the print of the first version gets shifted out of the array. So you will very likely have to increase the size of the array.

I do not know how the cost of .shift and .push change with the size of the array.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.