Painless script errors

Hi There ,

I need a quick help for painless script implementation for one logic

The Environment :

I am ingesting huge number of documents in following manner
Filebeat > logstash > elasticsearch

Filebeat will just collect data from multiple json files (No additional configuration other than basic input and logstash output )

Logstash will receive the docs from filebeat and add / rename some fields using mutate filter and send the data to elasticsearch output ( to a datastream)

The Requirement :
As i am uploading lot of documents , i want to add a condition that if the new document is already there in elasticsearch i want to drop the new document and if doesnt exist , i want to ingest with new document

Now i have 2 ways to do this (please suggest if there is any other less complex way to do it , i really appreciate it )

1. From Logstash :

while reading the data the default message filed will have the structure

{
  "url": "https://zyuuki.com/newsrelease/%e3%80%90%e3%83%96%e3%83%ad%e3%82%b0%e3%80%91%e6%9c%ac%e6%97%a5%e3%81%ae%e5%80%89%e5%ba%ab%e2%99%aa",
  "lan": "jpn,eng",
  "wd": "2024-02-27T20:19:45Z",
  "cn": "【ブログ】本日の倉庫♪ | 什器ドットコム|店舗 展示会 什器レンタル 展示会用・店舗用レンタル什器・什器レンタルの事なら [什器.com] ................. All rights reserved. "
 }

Hense i used GROK to split data for fields url , lan, wd, cn

after this the URL has to be the unique for all documents , so my target is to compare the url field from already ingested data in index and if exist drop the new document ingestion , if new document is not exist in index , we can proceed .

for this the bellow is the filter i used and some commented stuff are my different approaches to match the data

filter {

  grok {
        match => {
               "message" => [
                     "\s+\{\n\s+\"url\"\:\s+\"(?<url>[^\s]+)\"\,\n\s+\"lan\"\:\s+\"(?<lan>[^\s]+)\"\,\n\s+\"wd\"\:\s+\"(?<wd>[^\s]+)\"\,\n\s+\"cn\"\:\s+(?<cn>.*)\n\s+}",
                     "\{\n\s+\"url\"\:\s+\"(?<url>[^\s]+)\"\,\n\s+\"lan\"\:\s+null\,\n\s+\"wd\"\:\s+\"(?<wd>[^\s]+)\"\,\n\s+\"cn\"\:\s+(?<cn>.*)\n\s+}"
                     ]
                 }
      }

  mutate {
        gsub => [
          "cn", '\\\"', '',
          "lan", '\\\"', '',
          "ht", '\\\"', '',
          "url", '\\\"', ''
        ]
      }

  mutate {  gsub => [ "cn", "\\\"", ""]      }

  mutate {  strip => ["url", "cn", "lan", "ht"]  }


  elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "elastic"
    password => "ADQ@sravankumar"
    index => "logs-webcrawl-filestream-dev"
    #query => "wu.original:%{[url]}"  # Use match query for URL matching
    #query => {
    #  match => {
    #    "wu.original" => "%{[url]}"
    #  }
    #}
    #query => {
    #  "query_string" => {
    #    "wu.original" => "%{[url]}"
    #  }
    #}
    #query_template => "/etc/logstash/conf.d/template.json"
    query => {
      "query_string" => {
        "query" =>  "url:%{[url]}"
      }
    }

    fields => { "hits.total" => "total_hits" }
  }

  if ![total_hits] {
    # If no match is found, ingest the event as a new document
    mutate {
      remove_field => ["total_hits"]   # Remove total_hits field to avoid conflicts
           }

     mutate {
      remove_field => ["event.original", "event.original.keyword", "host.architecture", "host.containerized", "host.id", "host.mac","host.os.codename", "host.os.kernel", "host.os.family", "host.os.name", "host.os.platform", "host.os.type", "host.os.version", "input.type", "html", "ht", "agent.ephemeral_id", "agent.id", "agent.type", "agent.version", "data_stream.dataset", "data_stream.namespace", "data_stream.type", "ecs.version", "agent.name"]
    }
  }
  else {
    # If a match is found, drop the event
    drop {}
  }
}

The problem with this approach is , the format of the url field , which is havinf scpecial charecters and from mutate > gsub which i used here is not working effectively

The url is ingesting kind of like bellow in json format


"/"https://zyuuki.com/newsrelease/%e3%80%90%e3%83%96%e3%83%ad%e3%82%b0%e3%80%91%e6%9c%ac%e6%97%a5%e3%81%ae%e5%80%89%e5%ba%ab%e2%99%aa/""

hence its not working

2. From Ingest Pipeline - Scripts :

Although i can fix the above issue using another GROK , i feel like i am putting so much on pipeline , so once i replicate the same using ingest pipeline , it worked !!

bellow is my ingest pipeline

{
  "description": "for testing",
  "processors": [
    {
      "append": {
        "field": "pipeline",
        "value": [
          "{{ _ingest.pipeline }}"
        ],
        "allow_duplicates": false,
        "ignore_failure": true
      }
    },
    {
      "grok": {
        "field": "message",
        "ignore_missing": true,
        "ignore_failure": true,
        "patterns": [
          "\\s+\\{\\n\\s+\\\"url\\\"\\:\\s+\\\"(?<url>[^\\s]+)\\\"\\,\\n\\s+\\\"lan\\\"\\:\\s+\\\"(?<lan>[^\\s]+)\\\"\\,\\n\\s+\\\"wd\\\"\\:\\s+\\\"(?<wd>[^\\s]+)\\\"\\,\\n\\s+\\\"cn\\\"\\:\\s+(?<cn>.*)\\n\\s+}",
          "\\{\\n\\s+\\\"url\\\"\\:\\s+\\\"(?<url>[^\\s]+)\\\"\\,\\n\\s+\\\"lan\\\"\\:\\s+null\\,\\n\\s+\\\"wd\\\"\\:\\s+\\\"(?<wd>[^\\s]+)\\\"\\,\\n\\s+\\\"cn\\\"\\:\\s+(?<cn>.*)\\n\\s+}"
        ]
      }
    },
    {
      "rename": {
        "field": "host.name",
        "target_field": "sh"
      }
    },
    {
      "rename": {
        "field": "log.file.path",
        "target_field": "fn",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "date": {
        "field": "wd",
        "formats": [
          "yyyy-MM-dd'T'HH:mm:ss'Z'"
        ],
        "target_field": "wd",
        "ignore_failure": true
      }
    },
    {
      "trim": {
        "field": "url"
      }
    },
    {
      "set": {
        "field": "wu",
        "value": "{{url}}"
      }
    },
    {
      "uri_parts": {
        "field": "wu",
        "target_field": "wu",
        "ignore_failure": true
      }
    },
    {
      "gsub": {
        "field": "cn",
        "pattern": "\\\"",
        "replacement": "",
        "ignore_failure": true
      }
    },
    {
      "gsub": {
        "field": "lan",
        "pattern": "\\\"",
        "replacement": "",
        "ignore_failure": true
      }
    },
    {
      "gsub": {
        "field": "ht",
        "pattern": "\\,$",
        "replacement": "",
        "ignore_missing": true,
        "ignore_failure": true
      }
    },
    {
      "json": {
        "field": "ht",
        "add_to_root": true,
        "ignore_failure": true
      }
    },
    {
      "foreach": {
        "field": "Envelope.Payload-Metadata.HTTP-Response-Metadata.HTML-Metadata.Links",
        "processor": {
          "script": {
            "source": "\r\n      def links = ctx['Envelope']['Payload-Metadata']['HTTP-Response-Metadata']['HTML-Metadata']['Links'];\r\n      for (int i = 0; i < links.size(); i++) {\r\n        if (links[i].text == null) {\r\n          links[i].text = null;\r\n        }\r\n        if (links[i].url == null) {\r\n          links[i].url = null;\r\n        }\r\n        if (links[i].rel == null) {\r\n          links[i].rel = null;\r\n        }\r\n        if (links[i].title == null) {\r\n          links[i].title = null;\r\n        }\r\n        if (links[i].path == null) {\r\n          links[i].path = null;\r\n        }\r\n      }\r\n    "
          }
        },
        "ignore_failure": true
      }
    },
    {
      "remove": {
        "field": [
          "message",
          "host.architecture",
          "host.containerized",
          "host.id",
          "host.mac",
          "host.os.codename",
          "host.os.kernel",
          "host.os.family",
          "host.os.name",
          "host.os.platform",
          "host.os.type",
          "host.os.version",
          "input.type",
          "html",
          "ht",
          "url",
          "event.original",
          "agent.ephemeral_id",
          "agent.id",
          "agent.type",
          "agent.version",
          "data_stream.dataset",
          "data_stream.namespace",
          "data_stream.type",
          "ecs.version",
          "agent.name"
        ],
        "ignore_missing": true,
        "ignore_failure": true
      }
    }
  ],
  "on_failure": [
    {
      "append": {
        "field": "error.message",
        "value": [
          "{{ _ingest.on_failure_message }}"
        ],
        "allow_duplicates": false
      }
    }
  ]
}

The above pipeline is working for the json file ingestion as expected , but now i need to write a script processer to compare the url and drop if exist logic

for that i came up with the bellow script , which is not working and giving compile errors

def url = ctx['url'];
def existing_docs = ctx.client.search([
  'index': 'logs-webcrawl-filestream-dev',
  'body': [
    'query': [
      'match_phrase': [
        'wu.original': url
      ]
    ]
  ]
]);
if (existing_docs.hits.total.value == 0) {
  // If no match is found, proceed with uploading
} else {
  // If a match is found, drop the document
  ctx.drop();
}

please help me to fix the script , as i feel like working on ingest pipeline is much more effective rather than pushing everything on logstash pipeline

Hey
Unfortunately, the ingest pipeline context doesn't have access to other documents. So there is no way to remove duplicates before adding all data. You can use this blog post to find an answer for your question. Basically, you can filter with logstash solution

Hi @Yan_Savitski ,

Thanks for ruling out the possibility from ingest pipeline , let me go through my 1st try with logstash and try to find the solution there,

I went through blog you posted , that solution is to remove the duplicates but what i am trying to do is conditionally remove the duplicates by matching with new documents , also i want to add some more conditions in future .

example :
if url is matching and wd (date type field) is the latest one - index / update
if url is matching and wd id not latest compare to the new - dont index drop

so i honestly dont want to complicate the process by using fingerprint as it will update the _id , which will slow my indexing performance as per the document.

i have the working solution using python code - but drawback is indexing speed (2-3k documents per min ) , where as from LS or FB it will be 3L documents per min

so i want to use logstash to perform the same.

so if you can please review my logstash pipeline provided above already and if there are any suggestions , please appreciate your help

Hey
I moved this topic to the ElasticStack/Logstash category as engineers there have more expertise in this type of question.
Let me know if I can help you somehow with Search questions.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.