Elasticsearch get huge amount of duplicated Data from redis

Hello,

I data feed from Snort under Pfsense configuration getting from syslog>logstash >> redis and redis>>elasticsearch as follows below
The issue i have is that in the orginal source (snort logs)i have very few message but in Elasticsearch same events get feeds and feeds with the same event , is there a setting i missed ?
Please advise
Thanks

The input

    input {
  
  udp {
    port => 5142
    type => syslog
  }
}

filter {

    if [host] =~ /172\.17\.37\.2/ or [host] =~ /172.17\.143\.37\.1/{
      grok {
        #break_on_match => false
        match => [ "message", "\|%{SPACE}\[%{WORD:msg_source}%{GREEDYDATA}"]
        match => [ "message", "%{SYSLOGTIMESTAMP:timestamp}%{SPACE}%{WORD:msg_source}%{GREEDYDATA}" ]

  }
  }
 
}


output {
  

  if [msg_source]== "SNORTIDS" {    
  # This will attempt to do a geoip lookup against the SrcIP
    redis { host => "127.0.0.1" data_type => "list" key => "ids_sensors" }
    } else if [msg_source]== "openvpn" {
    redis { host => "127.0.0.1" data_type => "list" key => "dlt_openvpn" }
    }
    }

the elasticsearch output
input
{
redis {
host => "127.0.0.1"
type => "redis-input"
data_type => 'list'
key => "ids_sensors"
}
}
filter {

    grok {
      match => { "message" => "\|%{SPACE}\[%{WORD:msg_source}\[%{WORD:msg}\]\:%{SPACE}\[%{GREEDYDATA:sensor_name}\]%{SPACE}\]%{SPACE}\|\|%{SPACE}%{TIMESTAMP_ISO8601:event_timestamp}%{SPACE}%{INT:event_priority}%{SPACE}\[%{INT:gid}:%{INT:sid}:%{INT:rev}\]%{SPACE}%{DATA:alert_description}\|\|%{SPACE}%{DATA:classification}%{SPACE}\|\|%{SPACE}%{INT:protocol}%{SPACE}%{IP:SrcIp}%{SPACE}%{IP:DstIp}%{SPACE}\|\|%{SPACE}%{INT:SrcPort}%{SPACE}%{INT:DstPort}%{SPACE}"}
    }
   
       geoip {
      source => "[SrcIp]"
      target => "SrcGeo"
    }
    
    geoip {
      source => "[DstIp]"
      target => "DstGeo"
    }
    
    # If the alert is a Snort GPL alert break it apart for easier reading and categorization
    if [alert] =~ "GPL " {
      # This will parse out the category type from the alert
      grok {
        match => { "alert" => "GPL\s+%{DATA:category}\s" }
      }
      # This will store the category
      mutate {
        add_field => { "rule_type" => "Snort GPL" }
        lowercase => [ "category"]
        }
    }
    # If the alert is an Emerging Threat alert break it apart for easier reading and categorization
    if [alert] =~ "ET " {
      # This will parse out the category type from the alert
      grok {
        match => { "alert" => "ET\s+%{DATA:category}\s" }
      }
      # This will store the category
      mutate {
        add_field => { "rule_type" => "Emerging Threats" }
        lowercase => [ "category"]
      }
    }
    
    mutate {
      convert => [ "SrcPort", "integer" ]
      convert => [ "DstPort", "integer" ]
      convert => [ "event_priority", "integer" ]
      convert => [ "protocol", "integer" ]
      
      remove_field => [ "message"]
    }
    
    if [event_priority] == 1 {
      mutate {
        add_field => { "severity" => "High" }
      }
    }
    if [event_priority] == 2 {
      mutate {
        add_field => { "severity" => "Medium" }
      }
    }
    if [event_priority] == 3 {
      mutate {
        add_field => { "severity" => "Low" }
      }
    }
    
      mutate {
      
        add_field => [ "ET_Signature_Info", "http://doc.emergingthreats.net/%{sid}" ]
        add_field => [ "Snort_Signature_Info", "https://www.snort.org/search?query=%{gid}-%{sid}" ]
        
        }
  
    if [protocol] == 17 {
     mutate {
        replace => { "protocol" => "UDP" }
    }
    
    }
    
    if [protocol] == 6 {
     mutate {
        replace => { "protocol" => "TCP" }
    }
    
    }
    
    
    if [protocol] == 1 {
     mutate {
        replace => { "protocol" => "ICMP" }
    }
    
    }
    
    if [protocol] == 2 {
     mutate {
        replace => { "protocol" => "IGMP" }
    }
    
    }
    
  }
  output 
{
            elasticsearch {
            index => "ids_sensors"
            hosts => ["localhost:9200"]}         
}

Is all of this configuration used in a single Logstash instance or do you have two?

Hi
sorry i forgot to mention ,i am using two different files for each config above

Yes, but are both configuration files used by the same Logstash process?

ahaha Ok , sorry for the newbie :slight_smile: ,how i do make it each use each its own process/instance?
Thanks