Hi There ,
I need a quick help for painless script implementation for one logic
The Environment :
I am ingesting huge number of documents in following manner
Filebeat > logstash > elasticsearch
Filebeat will just collect data from multiple json files (No additional configuration other than basic input and logstash output )
Logstash will receive the docs from filebeat and add / rename some fields using mutate filter and send the data to elasticsearch output ( to a datastream)
The Requirement :
As i am uploading lot of documents , i want to add a condition that if the new document is already there in elasticsearch i want to drop the new document and if doesnt exist , i want to ingest with new document
Now i have 2 ways to do this (please suggest if there is any other less complex way to do it , i really appreciate it )
1. From Logstash :
while reading the data the default message filed will have the structure
{
"url": "https://zyuuki.com/newsrelease/%e3%80%90%e3%83%96%e3%83%ad%e3%82%b0%e3%80%91%e6%9c%ac%e6%97%a5%e3%81%ae%e5%80%89%e5%ba%ab%e2%99%aa",
"lan": "jpn,eng",
"wd": "2024-02-27T20:19:45Z",
"cn": "【ブログ】本日の倉庫♪ | 什器ドットコム|店舗 展示会 什器レンタル 展示会用・店舗用レンタル什器・什器レンタルの事なら [什器.com] ................. All rights reserved. "
}
Hense i used GROK to split data for fields url , lan, wd, cn
after this the URL has to be the unique for all documents , so my target is to compare the url field from already ingested data in index and if exist drop the new document ingestion , if new document is not exist in index , we can proceed .
for this the bellow is the filter i used and some commented stuff are my different approaches to match the data
filter {
grok {
match => {
"message" => [
"\s+\{\n\s+\"url\"\:\s+\"(?<url>[^\s]+)\"\,\n\s+\"lan\"\:\s+\"(?<lan>[^\s]+)\"\,\n\s+\"wd\"\:\s+\"(?<wd>[^\s]+)\"\,\n\s+\"cn\"\:\s+(?<cn>.*)\n\s+}",
"\{\n\s+\"url\"\:\s+\"(?<url>[^\s]+)\"\,\n\s+\"lan\"\:\s+null\,\n\s+\"wd\"\:\s+\"(?<wd>[^\s]+)\"\,\n\s+\"cn\"\:\s+(?<cn>.*)\n\s+}"
]
}
}
mutate {
gsub => [
"cn", '\\\"', '',
"lan", '\\\"', '',
"ht", '\\\"', '',
"url", '\\\"', ''
]
}
mutate { gsub => [ "cn", "\\\"", ""] }
mutate { strip => ["url", "cn", "lan", "ht"] }
elasticsearch {
hosts => ["http://localhost:9200"]
user => "elastic"
password => "ADQ@sravankumar"
index => "logs-webcrawl-filestream-dev"
#query => "wu.original:%{[url]}" # Use match query for URL matching
#query => {
# match => {
# "wu.original" => "%{[url]}"
# }
#}
#query => {
# "query_string" => {
# "wu.original" => "%{[url]}"
# }
#}
#query_template => "/etc/logstash/conf.d/template.json"
query => {
"query_string" => {
"query" => "url:%{[url]}"
}
}
fields => { "hits.total" => "total_hits" }
}
if ![total_hits] {
# If no match is found, ingest the event as a new document
mutate {
remove_field => ["total_hits"] # Remove total_hits field to avoid conflicts
}
mutate {
remove_field => ["event.original", "event.original.keyword", "host.architecture", "host.containerized", "host.id", "host.mac","host.os.codename", "host.os.kernel", "host.os.family", "host.os.name", "host.os.platform", "host.os.type", "host.os.version", "input.type", "html", "ht", "agent.ephemeral_id", "agent.id", "agent.type", "agent.version", "data_stream.dataset", "data_stream.namespace", "data_stream.type", "ecs.version", "agent.name"]
}
}
else {
# If a match is found, drop the event
drop {}
}
}
The problem with this approach is , the format of the url field , which is havinf scpecial charecters and from mutate > gsub which i used here is not working effectively
The url is ingesting kind of like bellow in json format
"/"https://zyuuki.com/newsrelease/%e3%80%90%e3%83%96%e3%83%ad%e3%82%b0%e3%80%91%e6%9c%ac%e6%97%a5%e3%81%ae%e5%80%89%e5%ba%ab%e2%99%aa/""
hence its not working
2. From Ingest Pipeline - Scripts :
Although i can fix the above issue using another GROK , i feel like i am putting so much on pipeline , so once i replicate the same using ingest pipeline , it worked !!
bellow is my ingest pipeline
{
"description": "for testing",
"processors": [
{
"append": {
"field": "pipeline",
"value": [
"{{ _ingest.pipeline }}"
],
"allow_duplicates": false,
"ignore_failure": true
}
},
{
"grok": {
"field": "message",
"ignore_missing": true,
"ignore_failure": true,
"patterns": [
"\\s+\\{\\n\\s+\\\"url\\\"\\:\\s+\\\"(?<url>[^\\s]+)\\\"\\,\\n\\s+\\\"lan\\\"\\:\\s+\\\"(?<lan>[^\\s]+)\\\"\\,\\n\\s+\\\"wd\\\"\\:\\s+\\\"(?<wd>[^\\s]+)\\\"\\,\\n\\s+\\\"cn\\\"\\:\\s+(?<cn>.*)\\n\\s+}",
"\\{\\n\\s+\\\"url\\\"\\:\\s+\\\"(?<url>[^\\s]+)\\\"\\,\\n\\s+\\\"lan\\\"\\:\\s+null\\,\\n\\s+\\\"wd\\\"\\:\\s+\\\"(?<wd>[^\\s]+)\\\"\\,\\n\\s+\\\"cn\\\"\\:\\s+(?<cn>.*)\\n\\s+}"
]
}
},
{
"rename": {
"field": "host.name",
"target_field": "sh"
}
},
{
"rename": {
"field": "log.file.path",
"target_field": "fn",
"ignore_missing": true,
"ignore_failure": true
}
},
{
"date": {
"field": "wd",
"formats": [
"yyyy-MM-dd'T'HH:mm:ss'Z'"
],
"target_field": "wd",
"ignore_failure": true
}
},
{
"trim": {
"field": "url"
}
},
{
"set": {
"field": "wu",
"value": "{{url}}"
}
},
{
"uri_parts": {
"field": "wu",
"target_field": "wu",
"ignore_failure": true
}
},
{
"gsub": {
"field": "cn",
"pattern": "\\\"",
"replacement": "",
"ignore_failure": true
}
},
{
"gsub": {
"field": "lan",
"pattern": "\\\"",
"replacement": "",
"ignore_failure": true
}
},
{
"gsub": {
"field": "ht",
"pattern": "\\,$",
"replacement": "",
"ignore_missing": true,
"ignore_failure": true
}
},
{
"json": {
"field": "ht",
"add_to_root": true,
"ignore_failure": true
}
},
{
"foreach": {
"field": "Envelope.Payload-Metadata.HTTP-Response-Metadata.HTML-Metadata.Links",
"processor": {
"script": {
"source": "\r\n def links = ctx['Envelope']['Payload-Metadata']['HTTP-Response-Metadata']['HTML-Metadata']['Links'];\r\n for (int i = 0; i < links.size(); i++) {\r\n if (links[i].text == null) {\r\n links[i].text = null;\r\n }\r\n if (links[i].url == null) {\r\n links[i].url = null;\r\n }\r\n if (links[i].rel == null) {\r\n links[i].rel = null;\r\n }\r\n if (links[i].title == null) {\r\n links[i].title = null;\r\n }\r\n if (links[i].path == null) {\r\n links[i].path = null;\r\n }\r\n }\r\n "
}
},
"ignore_failure": true
}
},
{
"remove": {
"field": [
"message",
"host.architecture",
"host.containerized",
"host.id",
"host.mac",
"host.os.codename",
"host.os.kernel",
"host.os.family",
"host.os.name",
"host.os.platform",
"host.os.type",
"host.os.version",
"input.type",
"html",
"ht",
"url",
"event.original",
"agent.ephemeral_id",
"agent.id",
"agent.type",
"agent.version",
"data_stream.dataset",
"data_stream.namespace",
"data_stream.type",
"ecs.version",
"agent.name"
],
"ignore_missing": true,
"ignore_failure": true
}
}
],
"on_failure": [
{
"append": {
"field": "error.message",
"value": [
"{{ _ingest.on_failure_message }}"
],
"allow_duplicates": false
}
}
]
}
The above pipeline is working for the json file ingestion as expected , but now i need to write a script processer to compare the url and drop if exist logic
for that i came up with the bellow script , which is not working and giving compile errors
def url = ctx['url'];
def existing_docs = ctx.client.search([
'index': 'logs-webcrawl-filestream-dev',
'body': [
'query': [
'match_phrase': [
'wu.original': url
]
]
]
]);
if (existing_docs.hits.total.value == 0) {
// If no match is found, proceed with uploading
} else {
// If a match is found, drop the document
ctx.drop();
}
please help me to fix the script , as i feel like working on ingest pipeline is much more effective rather than pushing everything on logstash pipeline