[Threat Intelligence]: Avoid redundancy of information in the same index

Hello,

I am using this pipeline to enrich my SIEM with URLhaus information:

input {
  exec {
    command => 'curl https://urlhaus.abuse.ch/downloads/csv/ --output text.zip && unzip -c text.zip'
    interval => 86400
    type => 'iphaus'
    codec => line
  }
}
filter {
  if [type] == "iphaus" {
    csv {
      columns => ["id","dateadded","url","url_status","threat","tags","urlhaus_link","reporter"]
      separator => ","
    }
    mutate {
      remove_field => ["message"]
    }
  }
}

output {
  elasticsearch {
    hosts => ["https://X.X.X.X:9200"]
    index => "malware-%{+YYYY.MM.dd}"
    cacert => 'ca.crt'
    user => "elastic"
    password => "password"
  }
}

The only problem is that by downloading it every 24h, I am getting redundancy in my index, ( the same URL is present in the database of yesterday and of today).

I would like to know how can I match the url to my index url field, to see if it's already present or not, and then index it if it's not already indexed.

Thanks for your help.

If you set the document_id on the elasticsearch output based on some field, or a fingerprint generated from multiple fields of the document, then the elasticsearch output will overwrite the document instead of inserting a duplicate.

Thanks for your answer @Badger,
Could you tell me please how can I set that in my Logstash configuration !

To generate a fingerprint use a filter

fingerprint {
    concatenate_sources => true 
    method => "SHA256" 
    source => [ "url" ] # And possibly other fields
    target => "[@metadata][fingerprint]"
}

then reference it in the elasticsearch output using a sprintf reference

document_id => "%{[@metadata][fingerprint]}"
1 Like

Thank you very much @Badger,

I just tried it and it's working like a charm :star_struck:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.