Filebeat and updating existing docs

Hi,

I'm trying to update documents when they exists.. it is possible with filebeat?
Logstash has that functionality...

output {
  elasticsearch {
    doc_as_upsert => true
    document_id => "%{fingerprint}"

The fingerprint is calculated from two fields

It is possible in filebeat?
Does i have to sent logs over logstash? (filebeat -> logstash -> elasticsearch)

I have tried to use recomendations from : Deduplicate data | Filebeat Reference [8.5] | Elastic

But, when i will set @metadata._id with calculated fingerprint.. new logs will not appear...
There is no errors in filebeat logs (event has been successfully published to elastic)(logging.level: debug)

Log flow : Filebeat -> Elasticsearch
Filebeat 7.17
Elasticsearch 8.4

Perhaps share your filebeat configuration so we can take a look and perhaps we can help.

What are you using for the id? Are you generating one? Are you using some combination of fields?

Share your configuration and tell us a little bit more and perhaps we can help.

The ID is generated with combination of two fields

Input file is a CSV, delimited by tab

filebeat config

- type: log
  enabled: true
  paths:
    - /opt/incoming/data/*-operator_prefix.*
  fields:
     app: some_app
  close_eof: true
  fields_under_root: true
  processors:
    - dissect:
        tokenizer: "%{prefix}   %{machine.name}        %{machine.description}"
        field: "message"
        target_prefix: "test"
    - fingerprint:
        fields: ["test.machine.name", "test.prefix"]
        target_field: "@metadata._id"
        method: "sha1"


output.elasticsearch:
  pipelines:
    - pipeline: some-app-pipe
      when.equals:
        app: "some-app"

Ingest pipeline

{
  "some-app-pipe": {
    "processors": [
      {
        "set": {
          "field": "ecs.version",
          "value": "1.5.0"
        }
      },
      {
        "set": {
          "field": "_index",
          "value": "{{app}}-write"
        }
      },
      {
        "pipeline": {
          "name": "field_enrich-log_file"
        }
      },
      {
        "set": {
          "field": "event.ingested",
          "value": "{{_ingest.timestamp}}"
        }
      }
    ],
    "on_failure": [
      {
        "set": {
          "field": "event.kind",
          "value": "pipeline_error"
        }
      },
      {
        "set": {
          "field": "error",
          "value": {
            "message": "{{_ingest.on_failure_message}}",
            "type": "{{_ingest.on_failure_processor_type}}",
            "code": "{{_ingest.on_failure_processor_tag}}"
          }
        }
      },
      {
        "set": {
          "field": "_index",
          "value": "errors-{{app}}"
        }
      }
    ]
  }
}

Apologies I am should have asked are be there errors in the filebeat logs? Did you look?

Also, did you try the debugging without having ingest pipeline just to reduce the variables.

There is no errors in filebeat logs

for every log line i see :

DEBUG [processors] processing/processors.go:203 Publish event: {

Additional logs

filebeat[26461]: INFO [publisher_pipeline_output] pipeline/output.go:143 Connecting to backoff(elasticsearch(https://(.....)))
filebeat[26461]: DEBUG [esclientleg] eslegclient/connection.go:261 ES Ping(url=https://(......))
filebeat[26461]: INFO [publisher] pipeline/retry.go:219 retryer: send unwait signal to consumer
filebeat[26461]: INFO [publisher] pipeline/retry.go:223   done

In ingest pipeline i'm forcing destination index name

You only provided snippets of logs so it's very hard for me to tell / help.

You also didn't show me a sanitized version of what the publisher is actually publishing?

You show this ... But leave out all the interesting stuff :slight_smile:

DEBUG [processors] processing/processors.go:203 Publish event: {

I would remove the pipeline, run with -d "*"

Does it look correctly dissected?

Do you see the _id set?

Here is my sample... it works,
It creates a fingerprint
It assigns the _id and when written to elasticsearch uses that _id

- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: my-filestream-id

  # Change to true to enable this input configuration.
  enabled: true
  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - "/Users/sbrown/workspace/customers/acme/sample-data/catalina.out"

  processors:
    - fingerprint:
        fields: ["message"]
        target_field: "@metadata._id"
        method: "sha1"

And when I run
filebeat -e -d "*"

I see this in the below in the filebeat console... (you have to format it a bit) this is what is published to elasticsearch, it has the _id set.

{
	"@timestamp": "2022-12-10T16:24:41.400Z",
	"@metadata": {
		"beat": "filebeat",
		"type": "_doc",
		"version": "8.4.3",
		"_id": "6d084d6706c79dfb95ff164a9bf6abb0661ed5ee" <!---- Yup _id Set to fingerprint)
	},
	"message": "2022-11-09 01:26:17 INFO  Interesting Log Message",
	"input": {
		"type": "filestream"
	},
	"host": {
		"ip": [

So if you do not see this ... nothing down stream will work.

I suspect one of the processors it failing for you.

Sample publish event log

filebeat[26461]: DEBUG [processors] processing/processors.go:203 Publish event: {
                                                "@timestamp": "2022-12-10T10:10:28.695Z",
                                                "@metadata": {
                                                  "beat": "filebeat",
                                                  "type": "_doc",
                                                  "version": "7.17.5",
                                                  "_id": "791f529272c4e95fbba6273ace4c3e6db0b2944e"
                                                },
                                                "app": "app1",
                                                "test": {
                                                  "machine": {
                                                    "name": "1",
                                                    "description": "Test machine description2"
                                                  },
                                                  "prefix": "118fa800"
                                                }
                                              }
filebeat[26461]: DEBUG [processors] processing/processors.go:203 Publish event: {
                                                "@timestamp": "2022-12-10T10:10:28.695Z",
                                                "@metadata": {
                                                  "beat": "filebeat",
                                                  "type": "_doc",
                                                  "version": "7.17.5",
                                                  "_id": "fce6bad14a37b3dfe55a1b58f2393dc5402c6446"
                                                },
                                                "app": "app1",
                                                "test": {
                                                  "prefix": "118beea811",
                                                  "machine": {
                                                    "name": "1",
                                                    "description": "Test machine description"
                                                  }
                                                },
                                                "ecs": {
                                                  "version": "1.12.0"
                                                }
                                              }

Dissect & fingerprint processorrs are working properly

When I will not send logs to specific pipeline... (where i'm setting index name), which index, will contains sent data?

The events look good...

Apologies, I'm not quite sure I understand?

My question is if you don't use any pipelines is the correct ID set that would be the next step to debug.

These do not match.

some_app
vs
some-app

You gave me an example with filebeat configuration where you are crawling "/Users/sbrown/workspace/customers/acme/sample-data/catalina.out"

I assume that, that logs aren't going to any ingest pipeline

In which index, logs will be available ? filebeat-* ? any other?

That was just and Example and actually I did have pipeline but i Took it out.

It appears you are trying to route to custom index kinda as a "workaround" in the ingest pipeline... there are probably some other implications with that... I would probably back up a bit and try to understand what you are trying to accomplish.

We often see segmenting based on App etc... sometimes it is useful sometimes not and there different strategies to accomplish that etc... etc.. and often we see teams start with segmentation on the ingest side just to realized the same could and possibly more can be accomplished just as easily on the Read / Query side. Either way is valid ... but often I tell folks just getting started ... use the defaults, learn, iterate

At this point, I am just trying to answer your questions because I am not clear what you are trying to accomplish.

Well in 8.4 it's a data stream...

The data stream will be for example

filebeat-8.4.3

But the actual backing index will be a data stream index.

GET _cat/indices/*?v

health status index                                uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   .ds-filebeat-8.4.3-2022.12.10-000001 ptIW3qCLRpyVvNu88nBplw   1   1         28            0       26kb           26kb

You can see them in Kibana Stack Management -> Index Management

Ohh Wait you are using Filebeat 7.17 ... then it will be a concrete index like

filebeat-7.17.6-2022.12.10-000001

You are using Mixed Modes 7.x plus 8.x which also makes things a bit more confusing... 7.x is index centric and 8.x is Data Stream centric
7.x Filebeat will write and index in 8.x Elasticsearch
8.x Filebeat will write a data stream in 8.x Elasticsearch

I'm trying to update existing documents in elasticsearch, based on @metadata._id

Example

First published event

filebeat[26461]: DEBUG [processors] processing/processors.go:203 Publish event: {
                                                "@timestamp": "2022-12-10T10:10:28.695Z",
                                                "@metadata": {
                                                  "beat": "filebeat",
                                                  "type": "_doc",
                                                  "version": "7.17.5",
                                                  "_id": "**791f529272c4e95fbba6273ace4c3e6db0b2944e**"
                                                },
                                                "app": "app1",
                                                "test": {
                                                  "machine": {
                                                    "name": "1",
                                                    "description": "Test machine description2"
                                                  },
                                                  "prefix": "118fa800"
                                                }
                                              }

That document should be created in elasticsearch, beacuse doc with id : 791f529272c4e95fbba6273ace4c3e6db0b2944e, does not exists

After some time... filebeat crawls new file, with new test.machine.description

filebeat[26461]: DEBUG [processors] processing/processors.go:203 Publish event: {
                                                "@timestamp": "2022-12-12T10:10:28.695Z",
                                                "@metadata": {
                                                  "beat": "filebeat",
                                                  "type": "_doc",
                                                  "version": "7.17.5",
                                                  "_id": "**791f529272c4e95fbba6273ace4c3e6db0b2944e**"
                                                },
                                                "app": "app1",
                                                "test": {
                                                  "prefix": "118fa800",
                                                  "machine": {
                                                    "name": "1",
                                                    "description": "**New test machine description2**"
                                                  }

ID is the same like in previous event, so document should be updated (Thats what i'm trying to accomplish)

Unfortunately when i'm setting

    - fingerprint:
        fields: ["test.machine.name", "test.prefix"]
        target_field: "@metadata._id" -- **That line is problematic**
        method: "sha1"

doc never appear in kibana.. or i don't know where i can find it

This topic is similar to :

Hi @Marcin_Frankiewicz Thanks for the details, I think I do understand what you are trying to accomplish.

I am editing this post I was looking at 8.5 with Data Stream I need to re-look at 7.17.x

BTW This will NOT work in 8.x because Data Stream are Append Only and Do Not Support Updates, which is really what you are doing... (Upsert)

Let me look closer at 7.17.x

I definitely can write a doc with the fingerprint, but seem to be struggling to update with the same _id

You should focus on the initial write.. that works for sure... you need to debug that..

When you say...

How exactly do you determine that it does not exist..

Also if you take out the fingerprint do docs get written...

This works for sure ...

I am looking / working on the updates, which I am having issues with as well.

I do know how to make this work with logstash as it has a specific doc_as_upsert flag

Wow I spent a lot of time on this...I have a question into engineering... I have no problem getting the fingerprint to work on the initial index but I can not get it to update the document. I have a suspicion why, but I will wait. I tried everything I know including rarely used configurations I could not get it to work...

Apologies about the difficulty I agree Filebeat / Elasticsearch is not working as documented described here

So in the meantime if you want to set your own _id and update the documents when needed this will and does work. I tested this on 7.17.x (it actually changes some in 8.x) This is what people have been doing for a long time with Logstash. Logstash provides granular control over the index actions.

The architecture will be Filebeat->Logstash->Elasticsearch

Below I have included a filebeat.yml and logstash.yml and put comments in the filebeat.yml

The process:

  1. Clean up any existing indices etc.
  2. Configure filebeat to point at Elastichsearch
  3. run filebeat setup -e
  4. Configure filebeat to point to Logastash (see the config)
  5. Start Logstash with the configuration I provided... you can read about the settings I used here
  6. Start filebeat however you do
  7. As new documents come in the the same @metadata._id they will be updated
  8. I tested this and it does work for sure

filebeat.yml

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.


- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: my-filestream-id

  # Change to true to enable this input configuration.
  enabled: true
  #pipeline: onsemi-catalina-base
  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    # - "/Users/sbrown/workspace/customers/onsemi/sample-data/ELK_Log_Samples_TC1/TC1_Mapper/MapperLog_2022-10-18_08-09_UV5_22156F8G001.000.small.txt"
    - "/Users/sbrown/workspace/customers/onsemi/sample-data/catalina.out"
    # - /var/log/*.log
    #- c:\programdata\elasticsearch\logs\*

  parsers:
    - multiline:
        type: pattern
        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
        negate: true
        match: after

  processors:
    - fingerprint:
        fields: ["message"]
        target_field: "@metadata._id"
        method: "sha1"

# =================================== Kibana ===================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
setup.kibana:

# Configure what output to use when sending the data collected by the beat.

# ---------------------------- Elasticsearch Output ----------------------------
# output.console:

############
# UNCOMMENT output.elasticsearch and run filebeat setup -e FIRST and then comment out to run Logstash
############
# output.elasticsearch:
#   # Array of hosts to connect to.
#   hosts: ["localhost:9200"]
  # pipeline: discuss-id

  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"

# ------------------------------ Logstash Output -------------------------------
############
# Comment out output.logstash when running setup, uncomment output.logstash when running
############
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

beats-logstash.conf

################################################
# beats->logstash->es default config.
################################################
input {
  beats {
    port => 5044
  }
}

output {
  stdout {}
  if [@metadata][pipeline] {
    elasticsearch {
      hosts => "http://localhost:9200"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
      pipeline => "%{[@metadata][pipeline]}" 
      # user => "elastic"
      # password => "secret"
      document_id => "%{[@metadata][_id]}"
      doc_as_upsert => true
      action => update
    }
  } else {
    elasticsearch {
      hosts => "http://localhost:9200"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
      # user => "elastic"
      # password => "secret"
      document_id => "%{[@metadata][_id]}" 
      doc_as_upsert => true
      action => update
    }
  }
}

Thank you very much for invastigation :slight_smile:
Your working example may help other people deal with that problem(Updating documents with Filebeat -> Logstash -> Elasticsearch)

OK

I have debug that, and i found "lost" docs in filebeat indicles (when i'm not using ingest pipeline) with generated by fingerprint ID

After some test i'm able too, write docs with generated fingerprint, but i cannot get to update the document

That the main purpose for this Topic :slight_smile:
Update documents without Logstash

It is a bug? or mistake in documentation?

I don't know. I'm waiting for a response... And it's end of year and kind of busy so I'm not sure exactly when I will hear back.

You could certainly open a bug against the repo if you like.

There is a combination of things going on...

  • There is logic that checks the operation type create (create only) or index (create or update)
  • Since we are setting the _id the operation gets set to create, that explains the behavior
  • We can manually set the op_type BUT there is a bug with that, so that does not work (that is what I was trying)
  • The bug had been fixed in later versions but not back-ported to 7.17.x ... there is a now a PR for that BUT you need to update to that version when it comes out.

You will need to follow that....

In short your only solution today is to wait for that and set the @metadata.op_type: index or use the logstash solution I showed you.

The docs are no longer correct, I am not sure when that would get fixed.

1 Like

Thats the great news :slight_smile:
I will wait for new version

In higher filebeat versions (8.x), it can be done via the same way?

See this

Beat are all data streams from 8.x on.

The only way I know of at this time to do what you will will be to use the Logstash approach and not use Data Streams.

I will keep my eyes open for another approach.

1 Like

@Marcin_Frankiewicz Huh :slight_smile: !

I think I got it to work in 8..5.3 with a bit of a workaround / hack using normal indices (not data streams)... and no logstash...

You will need to manually setup your index template etcn as a normal index / not a data stream... and leave in those settings below so you can write to an index name not a data stream.

Then strangely this all works pretty simple because the logic in that bug is fixed... @metadata._id plus metadata.op_type: "index"

This worked! It updated the Doc in Place!

filebeat.inputs:

- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: my-filestream-id

  # Change to true to enable this input configuration.
  enabled: true
  paths:
    - "/Users/sbrown/workspace/customers/onsemi/sample-data/catalina.out"

  processors:
    - fingerprint:
        fields: ["message"]
        target_field: "@metadata._id"
        method: "sha1"
    - add_fields:
        target: "@metadata"
        fields:
          op_type: "index"

setup.template.settings:
  index.number_of_shards: 1

setup.kibana:

output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "customname-index"
setup.ilm.enabled: false
setup.template.enabled: true
setup.template.name: "customname"
setup.template.pattern: "customname-*"