Filebeat and updating existing docs

Did your solution is working on filebeat 8.4.3?

Did I properly tried to force destination index via @metadata _index?

filebeat[15854]: 2022-12-29T10:30:30.992+0100 DEBUG [processors] map[file.line:210 file.name:processing/processors.go] Publish event: {
                                                "@timestamp": "2022-12-29T09:30:30.992Z",
                                                "@metadata": {
                                                  "beat": "filebeat",
                                                  "type": "_doc",
                                                  "version": "8.4.3",
                                                  "_id": "0ed422b9bbf338abf372400bb348e2ac669fe22a",
                                                  "op_type": "index",
                                                  "_index": "ok-app1-write"
                                                },
                                                "test": {
                                                  "machine": {
                                                    "description": "MF Operator sp.zoo NEW_NAME2",
                                                    "name": "3"
                                                  },
                                                  "prefix": "122000"
                                                },
                                                "ecs": {
                                                  "version": "8.0.0"
                                                },
                                                "log": {
                                                  "file": {
                                                    "path": "/opt/some_file.csv"
                                                  }
                                                },
                                                "message": "122000\t3\tMF Operator sp.zoo NEW_NAME2",
                                                "app": "app1"
                                              }        {"ecs.version": "1.6.0"}

filebeat.yml

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  close_eof: true
  fields_under_root: true
  processors:
    - dissect:
        tokenizer: "%{prefix}   %{machine.name}        %{machine.description}"
        field: "message"
        target_prefix: "test"
    - fingerprint:
        fields: ["test.machine.name" , "test.prefix"]
        target_field: '@metadata._id'
        method: "sha1"
    - add_fields:
        target: '@metadata'
        fields:
          op_type: "index"
          _index: "ok-app1-write"

Now i'm reciving error (maybe because filebeat tries sent data to filebeat-* datastream)

2022-12-29T10:44:32.344+0100 WARN [elasticsearch] map[file.line:429 file.name:elasticsearch/client.go] Cannot index event publisher.Event{Content:beat.Event{Timestamp:time.Date(2022, time.December, 29, 10, 44, 31, 327927250, time.Local), Meta:{"_id":"0ed422b9bbf338abf372400bb348e2ac669fe22a","_index":"ok-app1-write","op_type":"index"}, Fields:{"app":"app1","ecs":{"version":"8.0.0"},"test":{"machine":{"description":"MF Operator sp.zoo NEW_NAME3","name":"3"},"prefix":"122000"},"log":{"file":{"path":"/opt/some_file.csv"}},"message":"122000\t3\tMF Operator sp.zoo NEW_NAME3"}, Private:file.State{Id:"native::942404-64768", PrevId:"", Finished:false, Fileinfo:(*os.fileStat)(0xc000a9d6c0), Source:"/opt/some_file.csv", Offset:213, Timestamp:time.Date(2022, time.December, 29, 10, 44, 31, 323541297, time.Local), TTL:-1, Type:"log", Meta:map[string]string(nil), FileStateOS:file.StateOS{Inode:0xe6144, Device:0xfd00}, IdentifierName:"native"}, TimeSeries:false}, Flags:0x1, Cache:publisher.EventCache{m:mapstr.M(nil)}} (status=400): {"type":"illegal_argument_exception","reason":"only write ops with an op_type of create are allowed in data streams"}, dropping event!        {"ecs.version": "1.6.0"}

My enviroment has been upgraded to 8.4.3 (filebeat & elasticsearch)

ok-app1-write is an write alias to real indicle

PUT ok-app1-000001
{
  "aliases": {
    "ok-app1-write": {
      "is_write_index": true
    }
  }
}

Please show me your entire filebeat.yml That is not really the correct way to set the index.

And yes, that error indicates that you're trying to write to a data stream because you did not set the index in the output section

Also, did you create your own template etc?

filebeat.inputs:

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  close_eof: true
  fields_under_root: true
  processors:
    - dissect:
        tokenizer: "%{prefix}   %{machine.name}        %{machine.description}"
        field: "message"
        target_prefix: "test"
    - fingerprint:
        fields: ["test.machine.name" , "test.prefix"]
        target_field: '@metadata._id'
        method: "sha1"
    - add_fields:
        target: '@metadata'
        fields:
          op_type: "index"
          _index: "ok-app1-write"

- type: log
  enabled: true
  paths:
    - /opt/other_logs/*.csv
  ....

- type: log
  enabled: true
  paths:
    - /opt/other_logs2/*.csv
  Many other input type log........

processors:
- drop_fields:
    fields: ["log.offset", "agent", "input", "source", "host"]
    ignore_missing: true

#====================

setup.template.enabled: false
setup.template.ilm.enabled: false
setup.ilm.enabled: false
logging.metrics.enabled: false
name: vm9


#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:

  hosts: ["elasticsearch_hosts"]
  protocol: "https"
  username: ""
  password: ""
  ssl.certificate_authorities: [""]
  bulk_max_size: 400
  workers: 2

#================================ Logging =====================================
logging.to_syslog: true
logging.to_files: true
logging.level: "debug"
logging.selectors: [ "*" ]
logging.files:
  name: filebeat
  rotateeverybytes: 10485760
  keepfiles: 10

http.enabled: true
monitoring.enabled: false

Templates & typical index (not datastream, are created)

Usually I'm setting index name via ingest pipeline :

pipelines:
- pipeline: some-pipeline
      when.equals:
        app: "app1"

Here I'm trying to set index name directly in filebeat

Hi @Marcin_Frankiewicz

What you are currently configured will not work (as you can see) as soon as you do not name and index in the output section the default data stream is assumed and that is the issue..

This whole trying to force the index name in the document itself is an anti-pattern perhaps we can figure out a better way ...

Pretty sure we can do what you want... but I need a bit better understanding...

But can you clarify a few things for me.

  1. Are you trying to route different documents to different indices based on a field in the message?

  2. Or are you just trying to route then to different pipelines for different processing

  3. Or Both... I feel like I am feeling around the elephant blind folded :slight_smile:

  4. Note for later... for updating the document you need to know the exact index so if the write alias rolls over to a new index I don't think this will work....

Can you kinda give me some pseudo code / logic of what you are trying to do?

But if you just want to crash in and try you will need to set the following... you must set the index in the output section and I would take out the add_fields: _index: "ok-app1-write"

This requires that you already have an index template that creates normal indices not data streams.

And I validated it does work on 8.4.3 Stack

output.elasticsearch:
hosts: ["localhost:9200"]
index: "ok-app1-write"

setup.ilm.enabled: false
setup.template.name: "ok-app1-write"
setup.template.pattern: "ok-app1-write-*"

I'm trying to route different documents to different indices by app field

filebeat.yml

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1

- type: log
  enabled: true
  paths:
    - /opt/other_logs/*.csv
  fields:
     app: app2

Documents which contains app: app1 shoud go to ok-app1-write indicle
Documents which contains app: app2 shoud go to ok-app2-write indicle

To get this I trying with two approaches

  1. Force index name by @metadata._index (as you mention it doesn't work)
    image

  2. Force index name by route to specific ingest pipeline. That pipeline will set index name
    image

filebeat.yml

pipelines:
- pipeline: some-pipeline1
      when.equals:
        app: "app1"
- pipeline: some-pipeline2
      when.equals:
        app: "app2"

ingest pipeline

    "processors": [
      {
        "set": {
          "field": "ecs.version",
          "value": "1.5.0"
        }
      },
      {
        "set": {
          "field": "_index",
          "value": "ok-{{app}}-write"
        }
      },
      {
        "set": {
          "field": "event.ingested",
          "value": "{{_ingest.timestamp}}"
        }
      }
    ],

I think that Logstash can do that.. (update documents with write-alias) (doc_as_upsert)
Now i don't have proof of that (in future i will try to deliver that)

Thanks for the information

  1. The simplest way to solve this is put index name in the input, each input can define an index in the input that will be used in the output. Sorry should have showed you that earlier but we were distracted with the bug. You can also define a pipeline for each input if you wanted

This works I tested it, no more trying to hack the index name

Each index name could be a write alias if you intended to use ILM / Rollover which is OK just a tad more work... if no Rollover / ILM is needed then they would just be concrete indices.

Ok here is the subtly: Suppose you are using ILM / Rollover via a write alias ...

today you write a document with _id: 1234567890 and the write alias is ok-app1-write which is actually pointing to ok-app1-write-2022.12.30-000001

Then tomorrow you get and update with _id: 1234567890 but now the ok-app1-write which is actually pointing to ok-app1-write-2022.12.31-000002

The original document is in ok-app1-write-2022.12.30-000001 but you are now writing to a different index ... you will then get a duplicate document in ok-app1-write-2022.12.31-000002 it wont magically find the _id in the other index... that is what I am trying to tell you.

That is independent of this filebeat or logstash method...

OK, now I understand how it works.......

I will try to set index name in input section, but.. setting index name via ingest pipeline, has one advantage.

If something wrong happen in ingest pipeline(processor error), then the document should be redirected (depends on app field) another index

error-app1-write

or

error-app2-write

it is possible via on_failure section

"on_failure" : [
      {
        "set" : {
          "field" : "processor_error.message",
          "value" : "{{ _ingest.on_failure_message }}"
        }
      },
      {
        "set" : {
          "field" : "processor_error.processor_type",
          "value" : "{{ _ingest.on_processor_type }}"
        }
      },
      {
        "set" : {
          "field" : "processor_error.processor_tag",
          "value" : "{{ _ingest.on_processor_tag }}"
        }
      },
      {
        "set" : {
          "field" : "_index",
          "value" : "error-{{app}}-write"
        }
      }
    ]

Sure Perhaps ... you will have to test... depending on the error... some errors only happen when the document is actually written to the index, but yes I guess you can do that, most folks just tag the record with a failure and write to the same index which is then easily viewed with a filter... instead of hunting between multiple indices... but seems valid.

The great thing about Elastic... it is an open flexible platform!

If i set index name in filebeat input section

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  index: ok-app1-write
  pipeline: ok-app1-pipeline

then on_failure directive in ingest pipeline can change index name? (redirect buggy document to index : error-app1-write? ) (redirect if some operation in ingest pipeline fails)

I don't want to have buggy documents in ok-app1-write indice... (only in error-app1-write)

Should be able to ... since you are in the edge cases I would test it... best way to find out.

1 Like

I had do some tests, with your tips, and here are my results

previously i had below errors

2022-12-29T10:44:32.344+0100 WARN [elasticsearch] map[file.line:429 file.name:elasticsearch/client.go] Cannot index event publisher.Event{Content:beat.Event{Timestamp:time.Date(2022, time.December, 29, 10, 44, 31, 327927250, time.Local), Meta:{"_id":"0ed422b9bbf338abf372400bb348e2ac669fe22a","_index":"ok-app1-write","op_type":"index"}, Fields:{"app":"app1","ecs":{"version":"8.0.0"},"test":{"machine":{"description":"MF Operator sp.zoo NEW_NAME3","name":"3"},"prefix":"122000"},"log":{"file":{"path":"/opt/some_file.csv"}},"message":"122000\t3\tMF Operator sp.zoo NEW_NAME3"}, Private:file.State{Id:"native::942404-64768", PrevId:"", Finished:false, Fileinfo:(*os.fileStat)(0xc000a9d6c0), Source:"/opt/some_file.csv", Offset:213, Timestamp:time.Date(2022, time.December, 29, 10, 44, 31, 323541297, time.Local), TTL:-1, Type:"log", Meta:map[string]string(nil), FileStateOS:file.StateOS{Inode:0xe6144, Device:0xfd00}, IdentifierName:"native"}, TimeSeries:false}, Flags:0x1, Cache:publisher.EventCache{m:mapstr.M(nil)}} (status=400): {"type":"illegal_argument_exception","reason":"only write ops with an op_type of create are allowed in data streams"}, dropping event!        {"ecs.version": "1.6.0"}

because document was not sent to specified pipeline (i assume some-pipeline1)

so below configration

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  close_eof: true
  fields_under_root: true
  processors:
    - dissect:
        tokenizer: "%{prefix}   %{machine.name}        %{machine.description}"
        field: "message"
        target_prefix: "test"
    - fingerprint:
        fields: ["test.machine.name" , "test.prefix"]
        target_field: '@metadata._id'
        method: "sha1"
    - add_fields:
        target: '@metadata'
        fields:
          op_type: "index"

pipelines:
- pipeline: some-pipeline1
      when.equals:
        app: "app1"
- pipeline: some-pipeline2
      when.equals:
        app: "app2"

doesn't work for me

Below configration worked

- type: log
  enabled: true
  paths:
    - /opt/*.csv
  fields:
     app: app1
  close_eof: true
  fields_under_root: true
  pipeline: some-pipeline1 
  processors:
    - dissect:
        tokenizer: "%{prefix}   %{machine.name}        %{machine.description}"
        field: "message"
        target_prefix: "test"
    - fingerprint:
        fields: ["test.machine.name" , "test.prefix"]
        target_field: '@metadata._id'
        method: "sha1"
    - add_fields:
        target: '@metadata'
        fields:
          op_type: "index"

I don't need to specify in input section, the index name.

Destination index will be set in ingest pipeline (ok-{{app}}-write if everything will be ok, errors-{{app}}-write if something goes wrong) (redirection works properly)

Also.... the documents are updated correctly!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.