Check if value is 'null' So it Can Be Used in a Fingerprint and Document _id

Hello,

I am working to setup fingerprint on a field if it is not null. If it is null, I will fingerprint a different field (which is never null). Currently, my filebeat processors look like this:

processors:
     - "decode_json_fields": ...
     - "drop_fields": ...
     - if:
         equals:
           data.p_id: null
           **// this is where the issue is - equals can only accept int, strings, or boolean**
       then:
         - fingerprint:
            fields: ["data.id"] ...
       else:
         - fingerprint:
            fields: ["data.p_id"] ...

I have also tried using when with equals conditional, but that did not work either. Does anyone have any suggestions?

Hi @iFamZ Welcome to the community!

Thinking you might need to use the has_fields condition as a top level condition

Hey @stephenb! Thanks for your prompt reply - I greatly appreciate it.

I did come across has_fields but the way I interpreted it is that it will check that the noted field exists in the document. In my case, data.p_id exists in the document, but in some cases it is null. Regardless, giving this a try!

Hey @stephenb - I tried your suggestion, but I do not think it is working. Here is exactly what I did:

processors:
     - "decode_json_fields": ...
     - if:
         has_fields: ["data.p_id"]
       then:
         - copy_fields:
             fields:
               - from: "data.p_id"
                 to: "temp_id"
       else:
          - copy_fields:
             fields:
               - from: "data.id"
                 to: "temp_id"
     - fingerprint:
         fields: ["temp_id"]
         target_field: "@metadata._id"
     - add_fields:
         target: "@metadata"
         fields:
           op_type: "index"
   - drop_fields: ["temp_id"]

I had to change my approach because fingerprint looks at the name of the field as well, while I only want to fingerprint on the value. As a result, I am creating a new field temp_id which will conditionally copy the value of either data.p_id if it is not null, or the value of data.id, if data.p_id is null. I will then fingerprint on this new temp_id and set that to be my document _id and then remove temp_id from the document.

I ran a test to see if the correct conditional temp_id was being written to the document for fingerprinting - it is not. It always uses the data.p_id value, even if it is null. So, I am back to where I started.

I am new to filebeats, please let me know if there is something glaringly wrong with the config.

Thanks again for your help!

Hi @iFamZ

Can you provide samples of your data please....

Raw log lines something to work from...

What version Elastic Stack and Filebeat are you on.

Also, provide you Entire filebeat.yml....

I c an be presented with two types of documents. My data can either be null for data.p_id, as in this case:

{
  field_a: ...,
  field_b: {...},
  data: {
    p_id: null,
    id: "abc123"
  }
  field_c: ...
}

In the above case, I want to fingerprint on my data.id.

Or my data.p_id can be populated with some information:

{
  field_a: ...,
  field_b: {...},
  data: {
    p_id: "xyz123",
    id: "abc123"
  }
  field_c: ...
}

In this case, I would like to fingerprint on my data.p_id.

Someone suggested I use the script processor, since it seems like checking if a value is null is not straightforward.

I am using elastic 8.6.0 and filebeat 8.6.0 as well.

It is difficult for me to provide you with my complete config file, but the sanitized config snippet I posted before is the most relevant portion. Everything else is more or less generic.

this helps... BUT there are other items in your filebeat.yml that can matter

Like are you creating/using your own templates, datastream names etc.

Please show me the sanitized version of the output section...

I noticed you put in op_type are you planning on updating the documents?

I can only help what I can see... and yes other settings could influence how the id is set

There was a with fingerprint, op_type and index bug a couple releases back...

Let me take a look and get back...

Yes, I am planning on updating the document. In fact, I would like to do something like this. Based on that link (and your solution to that post), the op_type is necessary if I would like to use fingerprint to be the document id, that is exactly what I am looking to do.

You will note that is my very long thread ........ Did you read it all :slight_smile:

So this is why context is important and just snippets of codes and questions can lead to long winding discussions that are wasteful etc ... example you did not show the config for decode_json_fields that matters...

You need to read the thread from this part from here down

Basically updating documents in data streams does not work, you will need to use indices, not data streams which is OK (sort of lots of limitations) but definitely harder... and has limitations

I would suggest you closely read that thread.... the partial solution with filebeat for 8.6.0 should be here... you will have to use indices not data streams which adds more complexity...

I am happy to help but I can not really help if you do not provide me with complete and / or meaningful configs and data (yes you can sanitize the fields") because to help I have to build samples and complete configs and guess at things when you leave out items it.

So your use case you want to read time series data and update documents... can be done but not easy... and there are pretty severe limitations you can not use ILM for time series data because the updates won't work unless you know the correct data so you are limited to a single index... but we will leave that for another time....

Read the thread closely and I will get your processors to work, your syntax is not correct :slight_smile:

here is my input

{"field_a":"value a 1","data":{"p_id":null,"id":"abc123"},"field_c":"value c 1"}
{"field_a":"value a 2","data":{"p_id":"123445","id":"abc456"},"field_c":"value c 2"}

here is my complete filebeat.yml which prints to console


filebeat.inputs:

# filestream is an input for collecting log messages from files.
- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: my-filestream-id

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /Users/sbrown/workspace/sample-data/discuss/ndjson/sample-fingerprint.ndjson

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

  # Period on which files under path should be checked for changes
  #reload.period: 10s

# ======================= Elasticsearch template setting =======================

setup.template.settings:
  index.number_of_shards: 1
  #index.codec: best_compression
  #_source.enabled: false


setup.kibana:

output.console:
  codec.json:
    pretty: true
# ---------------------------- Elasticsearch Output ----------------------------
# output.elasticsearch:
#   # Array of hosts to connect to.
#   hosts: ["localhost:9200"]

  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"

# ================================= Processors =================================
processors:

  - decode_json_fields: 
      fields: ["message"]
      target: ""

  - script:
      lang: javascript
      source: >
        function process(event) {
          var value = event.Get("data.p_id");
          if (value !== null) {
            event.Put("temp_id", value);
          } else {
            event.Put("temp_id", event.Get("data.id"));
          }
        }

  - fingerprint:
      fields: ["temp_id"]
      target_field: "@metadata._id"

  - add_fields:
      target: "@metadata"
      fields:
        op_type: "index"

  - drop_fields: 
      fields: ["temp_id"]

and the output

{
  "@timestamp": "2023-03-01T02:43:27.611Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.6.2",
    "_id": "46cc154a58a4972595f8b43a8bc1af95fb42b4ba61e9522a4bf8954a64483875",
    "op_type": "index"
  },
  "ecs": {
    "version": "8.0.0"
  },
  "host": {
    "name": "hyperion"
  },
  "agent": {
    "ephemeral_id": "3a97c0a8-2409-4535-be85-42ffa8c7dd4a",
    "id": "66df3886-486a-434b-bb8d-9f3035983f8d",
    "name": "hyperion",
    "type": "filebeat",
    "version": "8.6.2"
  },
  "field_a": "value a 1",
  "log": {
    "offset": 0,
    "file": {
      "path": "/Users/sbrown/workspace/sample-data/discuss/ndjson/sample-fingerprint.ndjson"
    }
  },
  "message": "{\"field_a\":\"value a 1\",\"data\":{\"p_id\":null,\"id\":\"abc123\"},\"field_c\":\"value c 1\"}",
  "input": {
    "type": "filestream"
  },
  "field_c": "value c 1",
  "data": {
    "p_id": null,
    "id": "abc123"
  }
}
{
  "@timestamp": "2023-03-01T02:43:27.611Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.6.2",
    "_id": "378c48f4c86340e2e287b5731a3cbc9aab9657b8c8f872bdd82f4f9a8c30e194",
    "op_type": "index"
  },
  "ecs": {
    "version": "8.0.0"
  },
  "host": {
    "name": "hyperion"
  },
  "agent": {
    "version": "8.6.2",
    "ephemeral_id": "3a97c0a8-2409-4535-be85-42ffa8c7dd4a",
    "id": "66df3886-486a-434b-bb8d-9f3035983f8d",
    "name": "hyperion",
    "type": "filebeat"
  },
  "field_a": "value a 2",
  "data": {
    "p_id": "123445",
    "id": "abc456"
  },
  "message": "{\"field_a\":\"value a 2\",\"data\":{\"p_id\":\"123445\",\"id\":\"abc456\"},\"field_c\":\"value c 2\"}",
  "log": {
    "offset": 81,
    "file": {
      "path": "/Users/sbrown/workspace/sample-data/discuss/ndjson/sample-fingerprint.ndjson"
    }
  },
  "input": {
    "type": "filestream"
  },
  "field_c": "value c 2"
}
1 Like

I did not know that the decode_json_fields were relevant to my issue, I can definitely post that. This should cover all the relevant information from filebeat.yml.

filebeat.inputs:
- type: gcp-pubsub
  project_id: a
  topic: b
  subscription_name: c
  credentials_file: /to/credentials/cred.json

filebeat.config:
  modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: false

setup.template.settings:
  index.number_of_shards: 1

setup.kibana:

output.elasticsearch:
  hosts: ["localhost"]
  index: "test_index"
  username: "elastic"
  password: "password"
  ssl:
    enabled: true
    ca_trusted_fingerprint: "767...09"

processors:
  - decode_json_fields:
    fields: ["message"]
    process_array: true
    max_depth: 4
    target: ""
    overwrite_keys: true
  - if:
      has_fields: ["data.p_id"]
    then:
      - copy_fields:
          fields:
            - from: "data.p_id"
              to: "temp_id"
    else:
      - copy_fields:
          fields:
            - from: "data.id"
              to: "temp_id"
  - drop_fields:
      fields: ["host", "event", "message", "ecs", "agent", "input", "temp_id"]

Yeah, I am very new to filebeats, and was going off the linked post for what I wanted to do.

Again, I appreciate all the help!

Awesome, so the script processor is the way to go - the other processors cannot check if a value is null?

Does not seem so plus your whole if / then syntax was incorrect.

The next thing you will run into is you can't just set the index name you need to look at that solution I posted there are a number of other settings you need.

Can't just do this without the template stuff i showed here

  hosts: ["localhost"]
  index: "test_index"
  username: "elastic"
  password: "password"
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.