Filebeat and updating existing docs

Wow I spent a lot of time on this...I have a question into engineering... I have no problem getting the fingerprint to work on the initial index but I can not get it to update the document. I have a suspicion why, but I will wait. I tried everything I know including rarely used configurations I could not get it to work...

Apologies about the difficulty I agree Filebeat / Elasticsearch is not working as documented described here

So in the meantime if you want to set your own _id and update the documents when needed this will and does work. I tested this on 7.17.x (it actually changes some in 8.x) This is what people have been doing for a long time with Logstash. Logstash provides granular control over the index actions.

The architecture will be Filebeat->Logstash->Elasticsearch

Below I have included a filebeat.yml and logstash.yml and put comments in the filebeat.yml

The process:

  1. Clean up any existing indices etc.
  2. Configure filebeat to point at Elastichsearch
  3. run filebeat setup -e
  4. Configure filebeat to point to Logastash (see the config)
  5. Start Logstash with the configuration I provided... you can read about the settings I used here
  6. Start filebeat however you do
  7. As new documents come in the the same @metadata._id they will be updated
  8. I tested this and it does work for sure

filebeat.yml

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.


- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: my-filestream-id

  # Change to true to enable this input configuration.
  enabled: true
  #pipeline: onsemi-catalina-base
  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    # - "/Users/sbrown/workspace/customers/onsemi/sample-data/ELK_Log_Samples_TC1/TC1_Mapper/MapperLog_2022-10-18_08-09_UV5_22156F8G001.000.small.txt"
    - "/Users/sbrown/workspace/customers/onsemi/sample-data/catalina.out"
    # - /var/log/*.log
    #- c:\programdata\elasticsearch\logs\*

  parsers:
    - multiline:
        type: pattern
        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
        negate: true
        match: after

  processors:
    - fingerprint:
        fields: ["message"]
        target_field: "@metadata._id"
        method: "sha1"

# =================================== Kibana ===================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
setup.kibana:

# Configure what output to use when sending the data collected by the beat.

# ---------------------------- Elasticsearch Output ----------------------------
# output.console:

############
# UNCOMMENT output.elasticsearch and run filebeat setup -e FIRST and then comment out to run Logstash
############
# output.elasticsearch:
#   # Array of hosts to connect to.
#   hosts: ["localhost:9200"]
  # pipeline: discuss-id

  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"

# ------------------------------ Logstash Output -------------------------------
############
# Comment out output.logstash when running setup, uncomment output.logstash when running
############
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

beats-logstash.conf

################################################
# beats->logstash->es default config.
################################################
input {
  beats {
    port => 5044
  }
}

output {
  stdout {}
  if [@metadata][pipeline] {
    elasticsearch {
      hosts => "http://localhost:9200"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
      pipeline => "%{[@metadata][pipeline]}" 
      # user => "elastic"
      # password => "secret"
      document_id => "%{[@metadata][_id]}"
      doc_as_upsert => true
      action => update
    }
  } else {
    elasticsearch {
      hosts => "http://localhost:9200"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
      # user => "elastic"
      # password => "secret"
      document_id => "%{[@metadata][_id]}" 
      doc_as_upsert => true
      action => update
    }
  }
}

Thank you very much for invastigation :slight_smile:
Your working example may help other people deal with that problem(Updating documents with Filebeat -> Logstash -> Elasticsearch)

OK

I have debug that, and i found "lost" docs in filebeat indicles (when i'm not using ingest pipeline) with generated by fingerprint ID

After some test i'm able too, write docs with generated fingerprint, but i cannot get to update the document

That the main purpose for this Topic :slight_smile:
Update documents without Logstash

It is a bug? or mistake in documentation?

I don't know. I'm waiting for a response... And it's end of year and kind of busy so I'm not sure exactly when I will hear back.

You could certainly open a bug against the repo if you like.

There is a combination of things going on...

  • There is logic that checks the operation type create (create only) or index (create or update)
  • Since we are setting the _id the operation gets set to create, that explains the behavior
  • We can manually set the op_type BUT there is a bug with that, so that does not work (that is what I was trying)
  • The bug had been fixed in later versions but not back-ported to 7.17.x ... there is a now a PR for that BUT you need to update to that version when it comes out.

You will need to follow that....

In short your only solution today is to wait for that and set the @metadata.op_type: index or use the logstash solution I showed you.

The docs are no longer correct, I am not sure when that would get fixed.

1 Like

Thats the great news :slight_smile:
I will wait for new version

In higher filebeat versions (8.x), it can be done via the same way?

See this

Beat are all data streams from 8.x on.

The only way I know of at this time to do what you will will be to use the Logstash approach and not use Data Streams.

I will keep my eyes open for another approach.

1 Like

@Marcin_Frankiewicz Huh :slight_smile: !

I think I got it to work in 8..5.3 with a bit of a workaround / hack using normal indices (not data streams)... and no logstash...

You will need to manually setup your index template etcn as a normal index / not a data stream... and leave in those settings below so you can write to an index name not a data stream.

Then strangely this all works pretty simple because the logic in that bug is fixed... @metadata._id plus metadata.op_type: "index"

This worked! It updated the Doc in Place!

filebeat.inputs:

- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: my-filestream-id

  # Change to true to enable this input configuration.
  enabled: true
  paths:
    - "/Users/sbrown/workspace/customers/onsemi/sample-data/catalina.out"

  processors:
    - fingerprint:
        fields: ["message"]
        target_field: "@metadata._id"
        method: "sha1"
    - add_fields:
        target: "@metadata"
        fields:
          op_type: "index"

setup.template.settings:
  index.number_of_shards: 1

setup.kibana:

output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "customname-index"
setup.ilm.enabled: false
setup.template.enabled: true
setup.template.name: "customname"
setup.template.pattern: "customname-*"

Did your solution is working on filebeat 8.4.3?

Did I properly tried to force destination index via @metadata _index?

filebeat[15854]: 2022-12-29T10:30:30.992+0100 DEBUG [processors] map[file.line:210 file.name:processing/processors.go] Publish event: {
                                                "@timestamp": "2022-12-29T09:30:30.992Z",
                                                "@metadata": {
                                                  "beat": "filebeat",
                                                  "type": "_doc",
                                                  "version": "8.4.3",
                                                  "_id": "0ed422b9bbf338abf372400bb348e2ac669fe22a",
                                                  "op_type": "index",
                                                  "_index": "ok-app1-write"
                                                },
                                                "test": {
                                                  "machine": {
                                                    "description": "MF Operator sp.zoo NEW_NAME2",
                                                    "name": "3"
                                                  },
                                                  "prefix": "122000"
                                                },
                                                "ecs": {
                                                  "version": "8.0.0"
                                                },
                                                "log": {
                                                  "file": {
                                                    "path": "/opt/some_file.csv"
                                                  }
                                                },
                                                "message": "122000\t3\tMF Operator sp.zoo NEW_NAME2",
                                                "app": "app1"
                                              }        {"ecs.version": "1.6.0"}

filebeat.yml

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  close_eof: true
  fields_under_root: true
  processors:
    - dissect:
        tokenizer: "%{prefix}   %{machine.name}        %{machine.description}"
        field: "message"
        target_prefix: "test"
    - fingerprint:
        fields: ["test.machine.name" , "test.prefix"]
        target_field: '@metadata._id'
        method: "sha1"
    - add_fields:
        target: '@metadata'
        fields:
          op_type: "index"
          _index: "ok-app1-write"

Now i'm reciving error (maybe because filebeat tries sent data to filebeat-* datastream)

2022-12-29T10:44:32.344+0100 WARN [elasticsearch] map[file.line:429 file.name:elasticsearch/client.go] Cannot index event publisher.Event{Content:beat.Event{Timestamp:time.Date(2022, time.December, 29, 10, 44, 31, 327927250, time.Local), Meta:{"_id":"0ed422b9bbf338abf372400bb348e2ac669fe22a","_index":"ok-app1-write","op_type":"index"}, Fields:{"app":"app1","ecs":{"version":"8.0.0"},"test":{"machine":{"description":"MF Operator sp.zoo NEW_NAME3","name":"3"},"prefix":"122000"},"log":{"file":{"path":"/opt/some_file.csv"}},"message":"122000\t3\tMF Operator sp.zoo NEW_NAME3"}, Private:file.State{Id:"native::942404-64768", PrevId:"", Finished:false, Fileinfo:(*os.fileStat)(0xc000a9d6c0), Source:"/opt/some_file.csv", Offset:213, Timestamp:time.Date(2022, time.December, 29, 10, 44, 31, 323541297, time.Local), TTL:-1, Type:"log", Meta:map[string]string(nil), FileStateOS:file.StateOS{Inode:0xe6144, Device:0xfd00}, IdentifierName:"native"}, TimeSeries:false}, Flags:0x1, Cache:publisher.EventCache{m:mapstr.M(nil)}} (status=400): {"type":"illegal_argument_exception","reason":"only write ops with an op_type of create are allowed in data streams"}, dropping event!        {"ecs.version": "1.6.0"}

My enviroment has been upgraded to 8.4.3 (filebeat & elasticsearch)

ok-app1-write is an write alias to real indicle

PUT ok-app1-000001
{
  "aliases": {
    "ok-app1-write": {
      "is_write_index": true
    }
  }
}

Please show me your entire filebeat.yml That is not really the correct way to set the index.

And yes, that error indicates that you're trying to write to a data stream because you did not set the index in the output section

Also, did you create your own template etc?

filebeat.inputs:

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  close_eof: true
  fields_under_root: true
  processors:
    - dissect:
        tokenizer: "%{prefix}   %{machine.name}        %{machine.description}"
        field: "message"
        target_prefix: "test"
    - fingerprint:
        fields: ["test.machine.name" , "test.prefix"]
        target_field: '@metadata._id'
        method: "sha1"
    - add_fields:
        target: '@metadata'
        fields:
          op_type: "index"
          _index: "ok-app1-write"

- type: log
  enabled: true
  paths:
    - /opt/other_logs/*.csv
  ....

- type: log
  enabled: true
  paths:
    - /opt/other_logs2/*.csv
  Many other input type log........

processors:
- drop_fields:
    fields: ["log.offset", "agent", "input", "source", "host"]
    ignore_missing: true

#====================

setup.template.enabled: false
setup.template.ilm.enabled: false
setup.ilm.enabled: false
logging.metrics.enabled: false
name: vm9


#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:

  hosts: ["elasticsearch_hosts"]
  protocol: "https"
  username: ""
  password: ""
  ssl.certificate_authorities: [""]
  bulk_max_size: 400
  workers: 2

#================================ Logging =====================================
logging.to_syslog: true
logging.to_files: true
logging.level: "debug"
logging.selectors: [ "*" ]
logging.files:
  name: filebeat
  rotateeverybytes: 10485760
  keepfiles: 10

http.enabled: true
monitoring.enabled: false

Templates & typical index (not datastream, are created)

Usually I'm setting index name via ingest pipeline :

pipelines:
- pipeline: some-pipeline
      when.equals:
        app: "app1"

Here I'm trying to set index name directly in filebeat

Hi @Marcin_Frankiewicz

What you are currently configured will not work (as you can see) as soon as you do not name and index in the output section the default data stream is assumed and that is the issue..

This whole trying to force the index name in the document itself is an anti-pattern perhaps we can figure out a better way ...

Pretty sure we can do what you want... but I need a bit better understanding...

But can you clarify a few things for me.

  1. Are you trying to route different documents to different indices based on a field in the message?

  2. Or are you just trying to route then to different pipelines for different processing

  3. Or Both... I feel like I am feeling around the elephant blind folded :slight_smile:

  4. Note for later... for updating the document you need to know the exact index so if the write alias rolls over to a new index I don't think this will work....

Can you kinda give me some pseudo code / logic of what you are trying to do?

But if you just want to crash in and try you will need to set the following... you must set the index in the output section and I would take out the add_fields: _index: "ok-app1-write"

This requires that you already have an index template that creates normal indices not data streams.

And I validated it does work on 8.4.3 Stack

output.elasticsearch:
hosts: ["localhost:9200"]
index: "ok-app1-write"

setup.ilm.enabled: false
setup.template.name: "ok-app1-write"
setup.template.pattern: "ok-app1-write-*"

I'm trying to route different documents to different indices by app field

filebeat.yml

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1

- type: log
  enabled: true
  paths:
    - /opt/other_logs/*.csv
  fields:
     app: app2

Documents which contains app: app1 shoud go to ok-app1-write indicle
Documents which contains app: app2 shoud go to ok-app2-write indicle

To get this I trying with two approaches

  1. Force index name by @metadata._index (as you mention it doesn't work)
    image

  2. Force index name by route to specific ingest pipeline. That pipeline will set index name
    image

filebeat.yml

pipelines:
- pipeline: some-pipeline1
      when.equals:
        app: "app1"
- pipeline: some-pipeline2
      when.equals:
        app: "app2"

ingest pipeline

    "processors": [
      {
        "set": {
          "field": "ecs.version",
          "value": "1.5.0"
        }
      },
      {
        "set": {
          "field": "_index",
          "value": "ok-{{app}}-write"
        }
      },
      {
        "set": {
          "field": "event.ingested",
          "value": "{{_ingest.timestamp}}"
        }
      }
    ],

I think that Logstash can do that.. (update documents with write-alias) (doc_as_upsert)
Now i don't have proof of that (in future i will try to deliver that)

Thanks for the information

  1. The simplest way to solve this is put index name in the input, each input can define an index in the input that will be used in the output. Sorry should have showed you that earlier but we were distracted with the bug. You can also define a pipeline for each input if you wanted

This works I tested it, no more trying to hack the index name

Each index name could be a write alias if you intended to use ILM / Rollover which is OK just a tad more work... if no Rollover / ILM is needed then they would just be concrete indices.

Ok here is the subtly: Suppose you are using ILM / Rollover via a write alias ...

today you write a document with _id: 1234567890 and the write alias is ok-app1-write which is actually pointing to ok-app1-write-2022.12.30-000001

Then tomorrow you get and update with _id: 1234567890 but now the ok-app1-write which is actually pointing to ok-app1-write-2022.12.31-000002

The original document is in ok-app1-write-2022.12.30-000001 but you are now writing to a different index ... you will then get a duplicate document in ok-app1-write-2022.12.31-000002 it wont magically find the _id in the other index... that is what I am trying to tell you.

That is independent of this filebeat or logstash method...

OK, now I understand how it works.......

I will try to set index name in input section, but.. setting index name via ingest pipeline, has one advantage.

If something wrong happen in ingest pipeline(processor error), then the document should be redirected (depends on app field) another index

error-app1-write

or

error-app2-write

it is possible via on_failure section

"on_failure" : [
      {
        "set" : {
          "field" : "processor_error.message",
          "value" : "{{ _ingest.on_failure_message }}"
        }
      },
      {
        "set" : {
          "field" : "processor_error.processor_type",
          "value" : "{{ _ingest.on_processor_type }}"
        }
      },
      {
        "set" : {
          "field" : "processor_error.processor_tag",
          "value" : "{{ _ingest.on_processor_tag }}"
        }
      },
      {
        "set" : {
          "field" : "_index",
          "value" : "error-{{app}}-write"
        }
      }
    ]

Sure Perhaps ... you will have to test... depending on the error... some errors only happen when the document is actually written to the index, but yes I guess you can do that, most folks just tag the record with a failure and write to the same index which is then easily viewed with a filter... instead of hunting between multiple indices... but seems valid.

The great thing about Elastic... it is an open flexible platform!

If i set index name in filebeat input section

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  index: ok-app1-write
  pipeline: ok-app1-pipeline

then on_failure directive in ingest pipeline can change index name? (redirect buggy document to index : error-app1-write? ) (redirect if some operation in ingest pipeline fails)

I don't want to have buggy documents in ok-app1-write indice... (only in error-app1-write)

Should be able to ... since you are in the edge cases I would test it... best way to find out.

1 Like

I had do some tests, with your tips, and here are my results

previously i had below errors

2022-12-29T10:44:32.344+0100 WARN [elasticsearch] map[file.line:429 file.name:elasticsearch/client.go] Cannot index event publisher.Event{Content:beat.Event{Timestamp:time.Date(2022, time.December, 29, 10, 44, 31, 327927250, time.Local), Meta:{"_id":"0ed422b9bbf338abf372400bb348e2ac669fe22a","_index":"ok-app1-write","op_type":"index"}, Fields:{"app":"app1","ecs":{"version":"8.0.0"},"test":{"machine":{"description":"MF Operator sp.zoo NEW_NAME3","name":"3"},"prefix":"122000"},"log":{"file":{"path":"/opt/some_file.csv"}},"message":"122000\t3\tMF Operator sp.zoo NEW_NAME3"}, Private:file.State{Id:"native::942404-64768", PrevId:"", Finished:false, Fileinfo:(*os.fileStat)(0xc000a9d6c0), Source:"/opt/some_file.csv", Offset:213, Timestamp:time.Date(2022, time.December, 29, 10, 44, 31, 323541297, time.Local), TTL:-1, Type:"log", Meta:map[string]string(nil), FileStateOS:file.StateOS{Inode:0xe6144, Device:0xfd00}, IdentifierName:"native"}, TimeSeries:false}, Flags:0x1, Cache:publisher.EventCache{m:mapstr.M(nil)}} (status=400): {"type":"illegal_argument_exception","reason":"only write ops with an op_type of create are allowed in data streams"}, dropping event!        {"ecs.version": "1.6.0"}

because document was not sent to specified pipeline (i assume some-pipeline1)

so below configration

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  close_eof: true
  fields_under_root: true
  processors:
    - dissect:
        tokenizer: "%{prefix}   %{machine.name}        %{machine.description}"
        field: "message"
        target_prefix: "test"
    - fingerprint:
        fields: ["test.machine.name" , "test.prefix"]
        target_field: '@metadata._id'
        method: "sha1"
    - add_fields:
        target: '@metadata'
        fields:
          op_type: "index"

pipelines:
- pipeline: some-pipeline1
      when.equals:
        app: "app1"
- pipeline: some-pipeline2
      when.equals:
        app: "app2"

doesn't work for me

Below configration worked

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  close_eof: true
  fields_under_root: true
  pipeline: some-pipeline1 
  processors:
    - dissect:
        tokenizer: "%{prefix}   %{machine.name}        %{machine.description}"
        field: "message"
        target_prefix: "test"
    - fingerprint:
        fields: ["test.machine.name" , "test.prefix"]
        target_field: '@metadata._id'
        method: "sha1"
    - add_fields:
        target: '@metadata'
        fields:
          op_type: "index"

I don't need to specify in input section, the index name.

Destination index will be set in ingest pipeline (ok-{{app}}-write if everything will be ok, errors-{{app}}-write if something goes wrong) (redirection works properly)

Also.... the documents are updated correctly!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.