Ingest pipeline pattern matching - much help needed

Hi all
I am using the following ES|QL to pattern match a substring in a field from a filebeat index -

FROM filebeat-*
| WHERE url.original LIKE "q="

It would make a lot of sense to drop any incoming documents that don't contain "q=" but I am struggling with the documentation and syntax.

Has anyone done similar and have s sample ingest pipeleine that they couls share please?

Thank you!

PUT _ingest/pipeline/drop_non_q_url
{
  "description": "Drop documents where url.original does not start with 'q='",
  "processors": [
    {
      "terminate": {
        "if": "ctx.url == null"
      }
    },
    {
      "terminate": {
        "if": "ctx.url.original == null"
      }
    },
    {
      "drop": {
        "if": "!ctx.url.original.startsWith('q=')"
      }
    }
  ]
}

POST _ingest/pipeline/drop_non_q_url/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "message1",
        "url": {
          "original": "q=whatever"
        }
      }
    },
    {
      "_source": {
        "message": "message2",
        "url": {
          "original": "q=whateverelse"
        }
      }
    },
    {
      "_source": {
        "message": "message3",
        "url": {
          "original": "random"
        }
      }
    },
    {
      "_source": {
        "message": "message4"
      }
    }
  ]
}

generates

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "message": "message1",
          "url": {
            "original": "q=whatever"
          }
        },
        "_ingest": {
          "timestamp": "2025-06-16T16:31:33.718765Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "message": "message2",
          "url": {
            "original": "q=whateverelse"
          }
        },
        "_ingest": {
          "timestamp": "2025-06-16T16:31:33.718847Z"
        }
      }
    },
    null,
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "message": "message4"
        },
        "_ingest": {
          "timestamp": "2025-06-16T16:31:33.718858Z"
        }
      }
    }
  ]
}

??

Thank you Kevin and Apologies!! My ES|QL copy/paste was incorrect and should have read:
FROM filebeat-
| WHERE url.original LIKE "q="

I am trying to drop documents that don't contain "q=". The "q=" can appear anywhere in the "url.original" field. Here's a sample:
"url.original": "www.google.com/complete/search?client=chrome-omni&gs_ri=chrome-ext-ansg&xssi=t&q=keywordtoo&oit=1&cp=10&pgcl=7&gs_rn=42&psi=xNhkQk0150BOhl3u&sugkey=AIzaSyA2KlwBX3mkFo30om9LUFYQhpqLoa_BNhE"

In this the search term is "keywordtoo"

Once I have removed all the non-search term records I would like to extract the term into an new field. in ES|QL it looks like this:
| GROK url.original "\?[^ ]*q=%{DATA:query}&"

Is that possible in an ingest pipeline?

Many thanks
Steve

use contains instead of startsWith:

      "drop": {
        "if": "!ctx.url.original.contains('q=')"
      }

Sure, but just requires a bit more work to extract the value from a regex, and to add another processor.

POST _ingest/pipeline/_simulate
{
  "pipeline": {
  "description": "Drop documents where url.original does not start with 'q='",
  "processors": [
    {
      "terminate": {
        "if": "ctx.url == null"
      }
    },
    {
      "terminate": {
        "if": "ctx.url.original == null"
      }
    },
    {
      "drop": {
        "if": "!ctx.url.original.contains('q=')"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """
          def m = /[^ ]*q=([^&]+)/.matcher(ctx.url.original);
          if (m.find()) {
           ctx.newfield1 = m.group(1);
          }
        """
      }
    }
  ]
  },
  "docs": [
    {
      "_source": {
        "message": "message1",
        "url": {
          "original": "q=whatever"
        }
      }
    },
    {
      "_source": {
        "message": "message2",
        "url": {
          "original": "q=whateverelse"
        }
      }
    },
    {
      "_source": {
        "message": "message3",
        "url": {
          "original": "random"
        }
      }
    },
    {
      "_source": {
        "message": "message4"
      }
    },
    {
      "_source": {
        "message": "message5",
        "url": {
          "original": "www.google.com/complete/search?client=chrome-omni&gs_ri=chrome-ext-ansg&xssi=t&q=keywordtoo&oit=1&cp=10&pgcl=7&gs_rn=42&psi=xNhkQk0150BOhl3u&sugkey=AIzaSyA2KlwBX3mkFo30om9LUFYQhpqLoa_BNhE"
        }
      }
    }
  ]
}

Thanks again Kevin!
The pipeline works great when testing in "Dev Tools" and the "ingest pipeline" Kibana interface.
I thought all I would need to do is specify the pipeline in the filebeat.yml and all would be good but this doesn't seem to work. I mention this in case you have a better idea?
I already feel that I have taken advantage of your kindness so no problem if not.
I have learnt a lot from the info provided and I am grateful.
My Elastic learning has always been "two steps forward, followed by a random number of steps in a different direction :slight_smile:
Cheers!
Steve

@SteveParker

Share your filebeat.yml perhaps we can help.

1 Like

Well no, not yet, but another tip is to add a field/value to every doc (first processor) that goes through the pipeline, just to show it was processed (or not) by the pipeline

    {
      "set": {
        "field": "pipelineStatus",
        "value": "processed-by-my-first-pipeline"
      }
    },

And another tip is do the _simulate on samples of your actual, real documents, not my toy testing docs.

Thanks Kevin,
As suspected the new filed is added when testing in the Kibana interface but not when trying to run the pipeline from the filebeat.yml.
I will keep looking :slight_smile:

Thank you Stephen.
The filebeat agent is working and sending logs to the cluster. I added -
pipeline: "drop_non_q_url_test" but this has not worked. the filebeat.yml is as follows:

filebeat.inputs:
- type: filestream
  id: my-filestream-id
  enabled: false

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml # We have enabled the panw module for syslog
  reload.enabled: false

setup.template.settings:
  index.number_of_shards: 1

setup.kibana:
  host: "redacted"
  username: "redacted"  
  password: "redacted"
  
output.elasticsearch:
  hosts: ["redacted"]
  preset: balanced
  protocol: "https"
  username: "redacted"
  password: "redacted"
  ssl.certificate_authorities: redacted
  #
  #
  pipeline: "drop_non_q_url_test"

processors:
  - add_host_metadata:
    when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~



monitoring.enabled: true

monitoring.elasticsearch:
http.enabled: true

Are you using a module? because otherwise,.According to this configuration, nothing is being read by filebeat

If you're using a module, please share that confirmation yml

If you're using a module then that pipeline setting most likely will not work because the pipeline is set within the module

Please put three backticks ``` before and after the code and it will format it for you

1 Like

I am using the panw module as follows
'''

  • module: panw
    panos:
    enabled: true
    var.input: "syslog"
    var.syslog_host: 0.0.0.0
    var.syslog_port: 514
    '''

If you are using a module the parse is being done by an ingest pipeline.

I do not use Filebeat modules, but if it is similar to Elastic Agent integrations, then you should have an ingest pipeline to parse the log, which you should not edit, and this ingest pipeline can call a custom ingest pipeline, which is where you need to add your processor to remove the logs you want.