Hello all,
I have a pipeline with a jdbc connection to a mysql database pulling large documents with many values.
I am have added a ruby filter [1] to remove padded zeroes from one of the fields and save the unpadded value to another field.
I have then added a fingerprinting module in my pipeline [2] to flag certain documents as duplicates based on repeating fields and then remove duplicates before ingesting into elasticsearch.
Unfortunately some I have found about 120 documents (out of about 150,000) were completely dropped/skipped. Based on the fingerprinting logic I've tried to implement the unpad_skid field was unique so should not have been dropped. Below I've provided an example of the data [3].
[1]
mutate {
copy => { "skid" => "unpad_skid" }
}
mutate {
convert => {
"unpad_skid" => "string"
}
}
ruby {
code => "
field_name = event.get('unpad_skid')
if field_name && field_name.length >= 13
# Remove middle zeros from the string
field_name = field_name.sub(/(^0+)|((?<=^|[^0])0+(?=0|[^0]))|0+$/, '')
event.set('unpad_skid', field_name)
end
"
}
[2]
fingerprint {
source => ["hostname", "unpad_skid", "name", "@timestamp", "uuid"]
concatenate_sources => true
target => "[@metadata][fingerprint]"
method => "MURMUR3"
}
}
output {
elasticsearch {
hosts => ["https://elasticsearchnode1.aa.com:9200","https://elasticsearchnode2.aa.com:9200"]
data_stream => false
index => "indexname-deduplicated-%{+YYYY.MM}"
document_id => "%{[@metadata][fingerprint]}"
ssl_enabled => true
ssl_certificate_authorities => '/usr/share/logstash/certs/ca/ca.crt'
user=> "${ELASTIC_USER}"
password=> "${ELASTIC_PASSWORD}"
}
}
[3]
skid: '11000007923105'
unpad_skid: '117923105'
name: 'Strange Event ::[Custom] Service Test'
hostname: 'ab-host-441'
uuid: 8656k31-b269-12az-6312-3ed23665kj19
@timestamp: 2024-05-12T23:20:15.000Z
As stated some of the documents that have unique skid/unpad_skid values have been completely ignored in the pipeline - why may that be?
Thanks