Filebeat and updating existing docs

The events look good...

Apologies, I'm not quite sure I understand?

My question is if you don't use any pipelines is the correct ID set that would be the next step to debug.

These do not match.

some_app
vs
some-app

You gave me an example with filebeat configuration where you are crawling "/Users/sbrown/workspace/customers/acme/sample-data/catalina.out"

I assume that, that logs aren't going to any ingest pipeline

In which index, logs will be available ? filebeat-* ? any other?

That was just and Example and actually I did have pipeline but i Took it out.

It appears you are trying to route to custom index kinda as a "workaround" in the ingest pipeline... there are probably some other implications with that... I would probably back up a bit and try to understand what you are trying to accomplish.

We often see segmenting based on App etc... sometimes it is useful sometimes not and there different strategies to accomplish that etc... etc.. and often we see teams start with segmentation on the ingest side just to realized the same could and possibly more can be accomplished just as easily on the Read / Query side. Either way is valid ... but often I tell folks just getting started ... use the defaults, learn, iterate

At this point, I am just trying to answer your questions because I am not clear what you are trying to accomplish.

Well in 8.4 it's a data stream...

The data stream will be for example

filebeat-8.4.3

But the actual backing index will be a data stream index.

GET _cat/indices/*?v

health status index                                uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   .ds-filebeat-8.4.3-2022.12.10-000001 ptIW3qCLRpyVvNu88nBplw   1   1         28            0       26kb           26kb

You can see them in Kibana Stack Management -> Index Management

Ohh Wait you are using Filebeat 7.17 ... then it will be a concrete index like

filebeat-7.17.6-2022.12.10-000001

You are using Mixed Modes 7.x plus 8.x which also makes things a bit more confusing... 7.x is index centric and 8.x is Data Stream centric
7.x Filebeat will write and index in 8.x Elasticsearch
8.x Filebeat will write a data stream in 8.x Elasticsearch

I'm trying to update existing documents in elasticsearch, based on @metadata._id

Example

First published event

filebeat[26461]: DEBUG [processors] processing/processors.go:203 Publish event: {
                                                "@timestamp": "2022-12-10T10:10:28.695Z",
                                                "@metadata": {
                                                  "beat": "filebeat",
                                                  "type": "_doc",
                                                  "version": "7.17.5",
                                                  "_id": "**791f529272c4e95fbba6273ace4c3e6db0b2944e**"
                                                },
                                                "app": "app1",
                                                "test": {
                                                  "machine": {
                                                    "name": "1",
                                                    "description": "Test machine description2"
                                                  },
                                                  "prefix": "118fa800"
                                                }
                                              }

That document should be created in elasticsearch, beacuse doc with id : 791f529272c4e95fbba6273ace4c3e6db0b2944e, does not exists

After some time... filebeat crawls new file, with new test.machine.description

filebeat[26461]: DEBUG [processors] processing/processors.go:203 Publish event: {
                                                "@timestamp": "2022-12-12T10:10:28.695Z",
                                                "@metadata": {
                                                  "beat": "filebeat",
                                                  "type": "_doc",
                                                  "version": "7.17.5",
                                                  "_id": "**791f529272c4e95fbba6273ace4c3e6db0b2944e**"
                                                },
                                                "app": "app1",
                                                "test": {
                                                  "prefix": "118fa800",
                                                  "machine": {
                                                    "name": "1",
                                                    "description": "**New test machine description2**"
                                                  }

ID is the same like in previous event, so document should be updated (Thats what i'm trying to accomplish)

Unfortunately when i'm setting

    - fingerprint:
        fields: ["test.machine.name", "test.prefix"]
        target_field: "@metadata._id" -- **That line is problematic**
        method: "sha1"

doc never appear in kibana.. or i don't know where i can find it

This topic is similar to :

Hi @Marcin_Frankiewicz Thanks for the details, I think I do understand what you are trying to accomplish.

I am editing this post I was looking at 8.5 with Data Stream I need to re-look at 7.17.x

BTW This will NOT work in 8.x because Data Stream are Append Only and Do Not Support Updates, which is really what you are doing... (Upsert)

Let me look closer at 7.17.x

I definitely can write a doc with the fingerprint, but seem to be struggling to update with the same _id

You should focus on the initial write.. that works for sure... you need to debug that..

When you say...

How exactly do you determine that it does not exist..

Also if you take out the fingerprint do docs get written...

This works for sure ...

I am looking / working on the updates, which I am having issues with as well.

I do know how to make this work with logstash as it has a specific doc_as_upsert flag

Wow I spent a lot of time on this...I have a question into engineering... I have no problem getting the fingerprint to work on the initial index but I can not get it to update the document. I have a suspicion why, but I will wait. I tried everything I know including rarely used configurations I could not get it to work...

Apologies about the difficulty I agree Filebeat / Elasticsearch is not working as documented described here

So in the meantime if you want to set your own _id and update the documents when needed this will and does work. I tested this on 7.17.x (it actually changes some in 8.x) This is what people have been doing for a long time with Logstash. Logstash provides granular control over the index actions.

The architecture will be Filebeat->Logstash->Elasticsearch

Below I have included a filebeat.yml and logstash.yml and put comments in the filebeat.yml

The process:

  1. Clean up any existing indices etc.
  2. Configure filebeat to point at Elastichsearch
  3. run filebeat setup -e
  4. Configure filebeat to point to Logastash (see the config)
  5. Start Logstash with the configuration I provided... you can read about the settings I used here
  6. Start filebeat however you do
  7. As new documents come in the the same @metadata._id they will be updated
  8. I tested this and it does work for sure

filebeat.yml

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.


- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: my-filestream-id

  # Change to true to enable this input configuration.
  enabled: true
  #pipeline: onsemi-catalina-base
  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    # - "/Users/sbrown/workspace/customers/onsemi/sample-data/ELK_Log_Samples_TC1/TC1_Mapper/MapperLog_2022-10-18_08-09_UV5_22156F8G001.000.small.txt"
    - "/Users/sbrown/workspace/customers/onsemi/sample-data/catalina.out"
    # - /var/log/*.log
    #- c:\programdata\elasticsearch\logs\*

  parsers:
    - multiline:
        type: pattern
        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
        negate: true
        match: after

  processors:
    - fingerprint:
        fields: ["message"]
        target_field: "@metadata._id"
        method: "sha1"

# =================================== Kibana ===================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
setup.kibana:

# Configure what output to use when sending the data collected by the beat.

# ---------------------------- Elasticsearch Output ----------------------------
# output.console:

############
# UNCOMMENT output.elasticsearch and run filebeat setup -e FIRST and then comment out to run Logstash
############
# output.elasticsearch:
#   # Array of hosts to connect to.
#   hosts: ["localhost:9200"]
  # pipeline: discuss-id

  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"

# ------------------------------ Logstash Output -------------------------------
############
# Comment out output.logstash when running setup, uncomment output.logstash when running
############
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

beats-logstash.conf

################################################
# beats->logstash->es default config.
################################################
input {
  beats {
    port => 5044
  }
}

output {
  stdout {}
  if [@metadata][pipeline] {
    elasticsearch {
      hosts => "http://localhost:9200"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
      pipeline => "%{[@metadata][pipeline]}" 
      # user => "elastic"
      # password => "secret"
      document_id => "%{[@metadata][_id]}"
      doc_as_upsert => true
      action => update
    }
  } else {
    elasticsearch {
      hosts => "http://localhost:9200"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
      # user => "elastic"
      # password => "secret"
      document_id => "%{[@metadata][_id]}" 
      doc_as_upsert => true
      action => update
    }
  }
}

Thank you very much for invastigation :slight_smile:
Your working example may help other people deal with that problem(Updating documents with Filebeat -> Logstash -> Elasticsearch)

OK

I have debug that, and i found "lost" docs in filebeat indicles (when i'm not using ingest pipeline) with generated by fingerprint ID

After some test i'm able too, write docs with generated fingerprint, but i cannot get to update the document

That the main purpose for this Topic :slight_smile:
Update documents without Logstash

It is a bug? or mistake in documentation?

I don't know. I'm waiting for a response... And it's end of year and kind of busy so I'm not sure exactly when I will hear back.

You could certainly open a bug against the repo if you like.

There is a combination of things going on...

  • There is logic that checks the operation type create (create only) or index (create or update)
  • Since we are setting the _id the operation gets set to create, that explains the behavior
  • We can manually set the op_type BUT there is a bug with that, so that does not work (that is what I was trying)
  • The bug had been fixed in later versions but not back-ported to 7.17.x ... there is a now a PR for that BUT you need to update to that version when it comes out.

You will need to follow that....

In short your only solution today is to wait for that and set the @metadata.op_type: index or use the logstash solution I showed you.

The docs are no longer correct, I am not sure when that would get fixed.

1 Like

Thats the great news :slight_smile:
I will wait for new version

In higher filebeat versions (8.x), it can be done via the same way?

See this

Beat are all data streams from 8.x on.

The only way I know of at this time to do what you will will be to use the Logstash approach and not use Data Streams.

I will keep my eyes open for another approach.

1 Like

@Marcin_Frankiewicz Huh :slight_smile: !

I think I got it to work in 8..5.3 with a bit of a workaround / hack using normal indices (not data streams)... and no logstash...

You will need to manually setup your index template etcn as a normal index / not a data stream... and leave in those settings below so you can write to an index name not a data stream.

Then strangely this all works pretty simple because the logic in that bug is fixed... @metadata._id plus metadata.op_type: "index"

This worked! It updated the Doc in Place!

filebeat.inputs:

- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: my-filestream-id

  # Change to true to enable this input configuration.
  enabled: true
  paths:
    - "/Users/sbrown/workspace/customers/onsemi/sample-data/catalina.out"

  processors:
    - fingerprint:
        fields: ["message"]
        target_field: "@metadata._id"
        method: "sha1"
    - add_fields:
        target: "@metadata"
        fields:
          op_type: "index"

setup.template.settings:
  index.number_of_shards: 1

setup.kibana:

output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "customname-index"
setup.ilm.enabled: false
setup.template.enabled: true
setup.template.name: "customname"
setup.template.pattern: "customname-*"

Did your solution is working on filebeat 8.4.3?

Did I properly tried to force destination index via @metadata _index?

filebeat[15854]: 2022-12-29T10:30:30.992+0100 DEBUG [processors] map[file.line:210 file.name:processing/processors.go] Publish event: {
                                                "@timestamp": "2022-12-29T09:30:30.992Z",
                                                "@metadata": {
                                                  "beat": "filebeat",
                                                  "type": "_doc",
                                                  "version": "8.4.3",
                                                  "_id": "0ed422b9bbf338abf372400bb348e2ac669fe22a",
                                                  "op_type": "index",
                                                  "_index": "ok-app1-write"
                                                },
                                                "test": {
                                                  "machine": {
                                                    "description": "MF Operator sp.zoo NEW_NAME2",
                                                    "name": "3"
                                                  },
                                                  "prefix": "122000"
                                                },
                                                "ecs": {
                                                  "version": "8.0.0"
                                                },
                                                "log": {
                                                  "file": {
                                                    "path": "/opt/some_file.csv"
                                                  }
                                                },
                                                "message": "122000\t3\tMF Operator sp.zoo NEW_NAME2",
                                                "app": "app1"
                                              }        {"ecs.version": "1.6.0"}

filebeat.yml

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  close_eof: true
  fields_under_root: true
  processors:
    - dissect:
        tokenizer: "%{prefix}   %{machine.name}        %{machine.description}"
        field: "message"
        target_prefix: "test"
    - fingerprint:
        fields: ["test.machine.name" , "test.prefix"]
        target_field: '@metadata._id'
        method: "sha1"
    - add_fields:
        target: '@metadata'
        fields:
          op_type: "index"
          _index: "ok-app1-write"

Now i'm reciving error (maybe because filebeat tries sent data to filebeat-* datastream)

2022-12-29T10:44:32.344+0100 WARN [elasticsearch] map[file.line:429 file.name:elasticsearch/client.go] Cannot index event publisher.Event{Content:beat.Event{Timestamp:time.Date(2022, time.December, 29, 10, 44, 31, 327927250, time.Local), Meta:{"_id":"0ed422b9bbf338abf372400bb348e2ac669fe22a","_index":"ok-app1-write","op_type":"index"}, Fields:{"app":"app1","ecs":{"version":"8.0.0"},"test":{"machine":{"description":"MF Operator sp.zoo NEW_NAME3","name":"3"},"prefix":"122000"},"log":{"file":{"path":"/opt/some_file.csv"}},"message":"122000\t3\tMF Operator sp.zoo NEW_NAME3"}, Private:file.State{Id:"native::942404-64768", PrevId:"", Finished:false, Fileinfo:(*os.fileStat)(0xc000a9d6c0), Source:"/opt/some_file.csv", Offset:213, Timestamp:time.Date(2022, time.December, 29, 10, 44, 31, 323541297, time.Local), TTL:-1, Type:"log", Meta:map[string]string(nil), FileStateOS:file.StateOS{Inode:0xe6144, Device:0xfd00}, IdentifierName:"native"}, TimeSeries:false}, Flags:0x1, Cache:publisher.EventCache{m:mapstr.M(nil)}} (status=400): {"type":"illegal_argument_exception","reason":"only write ops with an op_type of create are allowed in data streams"}, dropping event!        {"ecs.version": "1.6.0"}

My enviroment has been upgraded to 8.4.3 (filebeat & elasticsearch)

ok-app1-write is an write alias to real indicle

PUT ok-app1-000001
{
  "aliases": {
    "ok-app1-write": {
      "is_write_index": true
    }
  }
}

Please show me your entire filebeat.yml That is not really the correct way to set the index.

And yes, that error indicates that you're trying to write to a data stream because you did not set the index in the output section

Also, did you create your own template etc?

filebeat.inputs:

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  close_eof: true
  fields_under_root: true
  processors:
    - dissect:
        tokenizer: "%{prefix}   %{machine.name}        %{machine.description}"
        field: "message"
        target_prefix: "test"
    - fingerprint:
        fields: ["test.machine.name" , "test.prefix"]
        target_field: '@metadata._id'
        method: "sha1"
    - add_fields:
        target: '@metadata'
        fields:
          op_type: "index"
          _index: "ok-app1-write"

- type: log
  enabled: true
  paths:
    - /opt/other_logs/*.csv
  ....

- type: log
  enabled: true
  paths:
    - /opt/other_logs2/*.csv
  Many other input type log........

processors:
- drop_fields:
    fields: ["log.offset", "agent", "input", "source", "host"]
    ignore_missing: true

#====================

setup.template.enabled: false
setup.template.ilm.enabled: false
setup.ilm.enabled: false
logging.metrics.enabled: false
name: vm9


#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:

  hosts: ["elasticsearch_hosts"]
  protocol: "https"
  username: ""
  password: ""
  ssl.certificate_authorities: [""]
  bulk_max_size: 400
  workers: 2

#================================ Logging =====================================
logging.to_syslog: true
logging.to_files: true
logging.level: "debug"
logging.selectors: [ "*" ]
logging.files:
  name: filebeat
  rotateeverybytes: 10485760
  keepfiles: 10

http.enabled: true
monitoring.enabled: false

Templates & typical index (not datastream, are created)

Usually I'm setting index name via ingest pipeline :

pipelines:
- pipeline: some-pipeline
      when.equals:
        app: "app1"

Here I'm trying to set index name directly in filebeat

Hi @Marcin_Frankiewicz

What you are currently configured will not work (as you can see) as soon as you do not name and index in the output section the default data stream is assumed and that is the issue..

This whole trying to force the index name in the document itself is an anti-pattern perhaps we can figure out a better way ...

Pretty sure we can do what you want... but I need a bit better understanding...

But can you clarify a few things for me.

  1. Are you trying to route different documents to different indices based on a field in the message?

  2. Or are you just trying to route then to different pipelines for different processing

  3. Or Both... I feel like I am feeling around the elephant blind folded :slight_smile:

  4. Note for later... for updating the document you need to know the exact index so if the write alias rolls over to a new index I don't think this will work....

Can you kinda give me some pseudo code / logic of what you are trying to do?

But if you just want to crash in and try you will need to set the following... you must set the index in the output section and I would take out the add_fields: _index: "ok-app1-write"

This requires that you already have an index template that creates normal indices not data streams.

And I validated it does work on 8.4.3 Stack

output.elasticsearch:
hosts: ["localhost:9200"]
index: "ok-app1-write"

setup.ilm.enabled: false
setup.template.name: "ok-app1-write"
setup.template.pattern: "ok-app1-write-*"

I'm trying to route different documents to different indices by app field

filebeat.yml

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1

- type: log
  enabled: true
  paths:
    - /opt/other_logs/*.csv
  fields:
     app: app2

Documents which contains app: app1 shoud go to ok-app1-write indicle
Documents which contains app: app2 shoud go to ok-app2-write indicle

To get this I trying with two approaches

  1. Force index name by @metadata._index (as you mention it doesn't work)
    image

  2. Force index name by route to specific ingest pipeline. That pipeline will set index name
    image

filebeat.yml

pipelines:
- pipeline: some-pipeline1
      when.equals:
        app: "app1"
- pipeline: some-pipeline2
      when.equals:
        app: "app2"

ingest pipeline

    "processors": [
      {
        "set": {
          "field": "ecs.version",
          "value": "1.5.0"
        }
      },
      {
        "set": {
          "field": "_index",
          "value": "ok-{{app}}-write"
        }
      },
      {
        "set": {
          "field": "event.ingested",
          "value": "{{_ingest.timestamp}}"
        }
      }
    ],

I think that Logstash can do that.. (update documents with write-alias) (doc_as_upsert)
Now i don't have proof of that (in future i will try to deliver that)

Thanks for the information

  1. The simplest way to solve this is put index name in the input, each input can define an index in the input that will be used in the output. Sorry should have showed you that earlier but we were distracted with the bug. You can also define a pipeline for each input if you wanted

This works I tested it, no more trying to hack the index name

Each index name could be a write alias if you intended to use ILM / Rollover which is OK just a tad more work... if no Rollover / ILM is needed then they would just be concrete indices.

Ok here is the subtly: Suppose you are using ILM / Rollover via a write alias ...

today you write a document with _id: 1234567890 and the write alias is ok-app1-write which is actually pointing to ok-app1-write-2022.12.30-000001

Then tomorrow you get and update with _id: 1234567890 but now the ok-app1-write which is actually pointing to ok-app1-write-2022.12.31-000002

The original document is in ok-app1-write-2022.12.30-000001 but you are now writing to a different index ... you will then get a duplicate document in ok-app1-write-2022.12.31-000002 it wont magically find the _id in the other index... that is what I am trying to tell you.

That is independent of this filebeat or logstash method...

OK, now I understand how it works.......

I will try to set index name in input section, but.. setting index name via ingest pipeline, has one advantage.

If something wrong happen in ingest pipeline(processor error), then the document should be redirected (depends on app field) another index

error-app1-write

or

error-app2-write

it is possible via on_failure section

"on_failure" : [
      {
        "set" : {
          "field" : "processor_error.message",
          "value" : "{{ _ingest.on_failure_message }}"
        }
      },
      {
        "set" : {
          "field" : "processor_error.processor_type",
          "value" : "{{ _ingest.on_processor_type }}"
        }
      },
      {
        "set" : {
          "field" : "processor_error.processor_tag",
          "value" : "{{ _ingest.on_processor_tag }}"
        }
      },
      {
        "set" : {
          "field" : "_index",
          "value" : "error-{{app}}-write"
        }
      }
    ]