Fatal error: concurrent map iteration and map write

I am using filebeat with google storage input and elasticsearch as output .. the filebeat starts and throws this error after few seconds :


{"log.level":"warn","@timestamp":"2023-03-23T13:46:05.824Z","log.logger":"input","log.origin":{"file.name":"v2/loader.go","file.line":104},"message":"BETA: The gcs input is beta","service.name":"filebeat","input":"gcs","stability":"Beta","deprecated":false,"ecs.version":"1.6.0"}

{"log.level":"info","@timestamp":"2023-03-23T13:46:05.824Z","log.logger":"crawler","log.origin":{"file.name":"beater/crawler.go","file.line":148},"message":"Starting input (ID: 1690526696054059251)","service.name":"filebeat","ecs.version":"1.6.0"}

{"log.level":"info","@timestamp":"2023-03-23T13:46:05.824Z","log.logger":"crawler","log.origin":{"file.name":"beater/crawler.go","file.line":106},"message":"Loading and starting Inputs completed. Enabled inputs: 1","service.name":"filebeat","ecs.version":"1.6.0"}

{"log.level":"info","@timestamp":"2023-03-23T13:46:05.824Z","log.logger":"input.gcs","log.origin":{"file.name":"compat/compat.go","file.line":113},"message":"Input 'gcs' starting","service.name":"filebeat","id":"my-gcs-id","ecs.version":"1.6.0"}

{"log.level":"info","@timestamp":"2023-03-23T13:46:05.824Z","log.logger":"input.gcs","log.origin":{"file.name":"gcs/input.go","file.line":129},"message":"Running google cloud storage for project: big-data-labs-342008","service.name":"filebeat","id":"my-gcs-id","input_source":"big-data-labs::fluentbit","project_id":"big-data-labs","bucket":"fluentbit","ecs.version":"1.6.0"}

{"log.level":"info","@timestamp":"2023-03-23T13:46:07.422Z","log.logger":"publisher_pipeline_output","log.origin":{"file.name":"pipeline/client_worker.go","file.line":139},"message":"Connecting to backoff(elasticsearch(http://logging-es-http.logging.svc.cluster.local:9200))","service.name":"filebeat","ecs.version":"1.6.0"}

{"log.level":"info","@timestamp":"2023-03-23T13:46:07.524Z","log.logger":"esclientleg","log.origin":{"file.name":"eslegclient/connection.go","file.line":291},"message":"Attempting to connect to Elasticsearch version 8.6.2","service.name":"filebeat","ecs.version":"1.6.0"}

{"log.level":"info","@timestamp":"2023-03-23T13:46:07.823Z","log.logger":"esclientleg","log.origin":{"file.name":"eslegclient/connection.go","file.line":291},"message":"Attempting to connect to Elasticsearch version 8.6.2","service.name":"filebeat","ecs.version":"1.6.0"}

{"log.level":"info","@timestamp":"2023-03-23T13:46:07.823Z","log.logger":"index-management","log.origin":{"file.name":"idxmgmt/std.go","file.line":231},"message":"Auto ILM enable success.","service.name":"filebeat","ecs.version":"1.6.0"}

{"log.level":"info","@timestamp":"2023-03-23T13:46:07.921Z","log.logger":"index-management.ilm","log.origin":{"file.name":"ilm/std.go","file.line":118},"message":"ILM policy filebeat exists already.","service.name":"filebeat","ecs.version":"1.6.0"}

{"log.level":"info","@timestamp":"2023-03-23T13:46:07.921Z","log.logger":"index-management","log.origin":{"file.name":"idxmgmt/std.go","file.line":366},"message":"Set settings.index.lifecycle.name in template to {filebeat {\"policy\":{\"phases\":{\"hot\":{\"actions\":{\"rollover\":{\"max_age\":\"30d\",\"max_primary_shard_size\":\"50gb\"}}}}}}} as ILM is enabled.","service.name":"filebeat","ecs.version":"1.6.0"}

fatal error: concurrent map iteration and map write

goroutine 145 [running]:

runtime.throw({0x55e147a93689?, 0x55e1451f67c5?})

runtime/panic.go:992 +0x71 fp=0xc001e07450 sp=0xc001e07420 pc=0x55e1449d5e51

runtime.mapiternext(0xc00069dc00?)

runtime/map.go:871 +0x4eb fp=0xc001e074c0 sp=0xc001e07450 pc=0x55e1449ae96b

github.com/elastic/go-structform.extObjVisitor.OnInt64Object({{0x55e148f68200?, 0xc00069dc00?}}, 0xc00080a540)

github.com/elastic/go-structform@v0.0.10/map.go:114 +0xc8 fp=0xc001e07560 sp=0xc001e074c0 pc=0x55e1451a5348

github.com/elastic/go-structform.(*extObjVisitor).OnInt64Object(0x55e144a80e5b?, 0x55e1488c9ea0?)

<autogenerated>:1 +0x3a fp=0xc001e07588 sp=0xc001e07560 pc=0x55e1451a735a

github.com/elastic/go-structform.(*extVisitor).OnInt64Object(0x55e1451aa14a?, 0xc00069dc00?)

<autogenerated>:1 +0x2b fp=0xc001e075a8 sp=0xc001e07588 pc=0x55e1451aac2b

github.com/elastic/go-structform/gotype.foldMapInt64(0x55e1488c9ea0?, {0x55e1488c9ea0?, 0xc00080a540?})

github.com/elastic/go-structform@v0.0.10/gotype/fold_map.go:83 +0x3d fp=0xc001e075d0 sp=0xc001e075a8 pc=0x55e1451b5ddd

github.com/elastic/go-structform/gotype.liftFold.func1(0x55e148ca33a0?, {0x55e1488c9ea0?, 0xc00071f778?, 0x55e14aeaba80?})

github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:605 +0xc4 fp=0xc001e07640 sp=0xc001e075d0 pc=0x55e1451bc684

github.com/elastic/go-structform/gotype.makeFieldFold.func1(0xc0008423f0, {0x55e148ca33a0?, 0xc00071f740?, 0xa?})

github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:311 +0xa9 fp=0xc001e07698 sp=0xc001e07640 pc=0x55e1451bb0e9

github.com/elastic/go-structform/gotype.makeFieldsFold.func1(0xc001e07778?, {0x55e148ca33a0?, 0xc00071f740?, 0x16?})

github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:198 +0x8a fp=0xc001e07700 sp=0xc001e07698 pc=0x55e1451ba70a

github.com/elastic/go-structform/gotype.makeStructFold.func1(0xc0008423f0, {0x55e148ca33a0?, 0xc00071f740?, 0xc001e077a0?})

github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:188 +0x83 fp=0xc001e07750 sp=0xc001e07700 pc=0x55e1451ba563

github.com/elastic/go-structform/gotype.makePointerFold.func1(0xc0008423f0, {0x55e148629cc0?, 0xc00071f740?, 0x55e1449a9d74?})

github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:124 +0x115 fp=0xc001e077a0 sp=0xc001e07750 pc=0x55e1451b9ff5

github.com/elastic/go-structform/gotype.foldAnyReflect(0x55e148629cc0?, {0x55e148629cc0?, 0xc00071f740?, 0xc001e07870?})

github.com/elastic/go-structform@v0.0.10/gotype/fold_reflect.go:568 +0x86 fp=0xc001e077f0 sp=0xc001e077a0 pc=0x55e1451bc206

github.com/elastic/go-structform/gotype.foldInterfaceValue(0xc0008423f0, {0x55e148629cc0?, 0xc00071f740})

github.com/elastic/go-structform@v0.0.10/gotype/fold.go:116 +0x27f fp=0xc001e07880 sp=0xc001e077f0 pc=0x55e1451b383f

github.com/elastic/go-structform/gotype.(*Iterator).Fold(...)

github.com/elastic/go-structform@v0.0.10/gotype/fold.go:93

github.com/elastic/beats/v7/libbeat/common/transform/typeconv.(*Converter).Convert(0xc0008081b0, {0x55e148646f00, 0xc001dfd3b0}, {0x55e148629cc0, 0xc00071f740})

github.com/elastic/beats/v7/libbeat/common/transform/typeconv/typeconv.go:108 +0x13a fp=0xc001e078f8 sp=0xc001e07880 pc=0x55e14624813a

github.com/elastic/beats/v7/libbeat/common/transform/typeconv.Convert({0x55e148646f00, 0xc001dfd3b0}, {0x55e148629cc0, 0xc00071f740})

github.com/elastic/beats/v7/libbeat/common/transform/typeconv/typeconv.go:129 +0xcb fp=0xc001e07970 sp=0xc001e078f8 pc=0x55e14624836b

github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.createUpdateOp(0xc0007008c0, 0xc0003bef80, {0x55e148629cc0?, 0xc00071f740})

github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/publish.go:101 +0x1e5 fp=0xc001e07a18 sp=0xc001e07970 pc=0x55e14697bd05

github.com/elastic/beats/v7/filebeat/input/v2/input-cursor.(*cursorPublisher).Publish(0xc00080a4e0, {{0xc0ff33e80d4d627b, 0xd0b088d1, 0x55e14ae777c0}, 0xc001cf74d0, 0xc001cf72c0, {0x0, 0x0}, 0x0}, {0x55e148629cc0, ...})

github.com/elastic/beats/v7/filebeat/input/v2/input-cursor/publish.go:71 +0x86 fp=0xc001e07a70 sp=0xc001e07a18 pc=0x55e14697b8a6

github.com/elastic/beats/v7/x-pack/filebeat/input/gcs.(*job).readJsonAndPublish(0xc0003880e0, {0x55e148f4f508, 0xc00071f8c0}, {0x55e148f0a808?, 0xc00082e280?}, {0xc0008ce820, 0xcb})

github.com/elastic/beats/v7/x-pack/filebeat/input/gcs/job.go:222 +0xadd fp=0xc001e07d50 sp=0xc001e07a70 pc=0x55e146e32cdd

github.com/elastic/beats/v7/x-pack/filebeat/input/gcs.(*job).processAndPublishData(0xc0003880e0, {0x55e148f4f508, 0xc00071f8c0}, {0xc0008ce820, 0xcb})

github.com/elastic/beats/v7/x-pack/filebeat/input/gcs/job.go:152 +0x247 fp=0xc001e07e20 sp=0xc001e07d50 pc=0x55e146e31f27

github.com/elastic/beats/v7/x-pack/filebeat/input/gcs.(*job).do(0xc0003880e0, {0x55e148f4f508, 0xc00071f8c0}, {0xc0008ce820, 0xcb})

github.com/elastic/beats/v7/x-pack/filebeat/input/gcs/job.go:94 +0x14b fp=0xc001e07f88 sp=0xc001e07e20 pc=0x55e146e3172b

github.com/elastic/beats/v7/x-pack/filebeat/input/gcs.(*scheduler).scheduleOnce.func1()

github.com/elastic/beats/v7/x-pack/filebeat/input/gcs/scheduler.go:121 +0x6f fp=0xc001e07fe0 sp=0xc001e07f88 pc=0x55e146e3434f

runtime.goexit()

runtime/asm_amd64.s:1571 +0x1 fp=0xc001e07fe8 sp=0xc001e07fe0 pc=0x55e144a0ae61

created by github.com/elastic/beats/v7/x-pack/filebeat/input/gcs.(*scheduler).scheduleOnce

github.com/elastic/beats/v7/x-pack/filebeat/input/gcs/scheduler.go:119 +0x255

goroutine 1 [chan receive]:

github.com/elastic/beats/v7/filebeat/beater.(*signalWait).Wait(...)

github.com/elastic/beats/v7/filebeat/beater/signalwait.go:44

github.com/elastic/beats/v7/filebeat/beater.(*Filebeat).Run(0xc00071e6c0, 0xc000075c00)

github.com/elastic/beats/v7/filebeat/beater/filebeat.go:397 +0x16df

github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).launch(0xc000075c00, {{0x55e147a191bb, 0x8}, {0x55e147a191bb, 0x8}, {0x0, 0x0}, 0x1, 0x1, {{0x0, ...}, ...}, ...}, ...)

github.com/elastic/beats/v7/libbeat/cmd/instance/beat.go:491 +0x9b3

github.com/elastic/beats/v7/libbeat/cmd/instance.Run.func1(0xc001243d20, 0x7020100?)

github.com/elastic/beats/v7/libbeat/cmd/instance/beat.go:183 +0x145

github.com/elastic/beats/v7/libbeat/cmd/instance.Run({{0x55e147a191bb, 0x8}, {0x55e147a191bb, 0x8}, {0x0, 0x0}, 0x1, 0x1, {{0x0, 0x0}, ...}, ...}, ...)

github.com/elastic/beats/v7/libbeat/cmd/instance/beat.go:184 +0x25

github.com/elastic/beats/v7/libbeat/cmd.genRunCmd.func1(0xc0002b9080?, {0x55e147a10a04?, 0x3?, 0x3?})

github.com/elastic/beats/v7/libbeat/cmd/run.go:36 +0x58

github.com/spf13/cobra.(*Command).execute(0xc0002b9080, {0xc000072090, 0x3, 0x3})

github.com/spf13/cobra@v1.3.0/command.go:860 +0x663

github.com/spf13/cobra.(*Command).ExecuteC(0xc0002b9080)

github.com/spf13/cobra@v1.3.0/command.go:974 +0x3b4

github.com/spf13/cobra.(*Command).Execute(...)

github.com/spf13/cobra@v1.3.0/command.go:902

main.main()

github.com/elastic/beats/v7/x-pack/filebeat/main.go:22 +0x25

Hi @Z4ck404 ,

Thanks for bringing up the issue. Could you try setting the max number of workers to 1 for the moment and try running the input ?

Hi @exdghost

I did set the max workers to 1 and now it fails with an other error once

panic: assignment to entry in nil map

goroutine 402 [running]:
github.com/elastic/beats/v7/x-pack/filebeat/input/gcs.(*state).savePartial(...)
	github.com/elastic/beats/v7/x-pack/filebeat/input/gcs/state.go:81
github.com/elastic/beats/v7/x-pack/filebeat/input/gcs.(*job).readJsonAndPublish(0xc0003a45b0, {0x560d80038508, 0xc000d7eb40}, {0x560d7fff3808?, 0xc000cd74a0?}, {0xc0003a86e0, 0x4a})
	github.com/elastic/beats/v7/x-pack/filebeat/input/gcs/job.go:220 +0xa25
github.com/elastic/beats/v7/x-pack/filebeat/input/gcs.(*job).processAndPublishData(0xc0003a45b0, {0x560d80038508, 0xc000d7eb40}, {0xc0003a86e0, 0x4a})
	github.com/elastic/beats/v7/x-pack/filebeat/input/gcs/job.go:152 +0x247
github.com/elastic/beats/v7/x-pack/filebeat/input/gcs.(*job).do(0xc0003a45b0, {0x560d80038508, 0xc000d7eb40}, {0xc0003a86e0, 0x4a})
	github.com/elastic/beats/v7/x-pack/filebeat/input/gcs/job.go:94 +0x14b
github.com/elastic/beats/v7/x-pack/filebeat/input/gcs.(*scheduler).scheduleOnce.func1()
	github.com/elastic/beats/v7/x-pack/filebeat/input/gcs/scheduler.go:121 +0x6f
created by github.com/elastic/beats/v7/x-pack/filebeat/input/gcs.(*scheduler).scheduleOnce

do you know why ?

Hi @Z4ck404 ,

There seems to me a map alloc issue, which ideally should not happen. I'm investigating at the moment. Meanwhile can you delete your registry and try running a fresh input once. At the same time could you share the config you are using ?

Hi @exdghost,

I am reading from a gcs bucket :

    filebeat.inputs:
    - type: gcs
      id: my-gcs-id
      enabled: true
      project_id: gcp-project-123
      auth.credentials_file.path: /mnt/secrets/credentials/gcs.json
      max_workers: 1
      poll: true
      poll_interval: 15s
      buckets:
      - name: fluentbit
        bucket_timeout: 30s

One additional question :
where does the filebeat expect to find the files in the bucket ?

@Z4ck404 , Thanks for sharing the config. If you have very large files in the bucket, "bucket_timeout" should have a higher value. You can experiment with that.

The answer to your question: Currently the gcs input will scan all files in the bucket by flattening it, and if the files have a supported content-type, it will read and publish.

We are addressing the concurrency issue that you initially posted with this PR : [filebeat][gcs] - Added missing locks for safe concurrency by ShourieG · Pull Request #34914 · elastic/beats · GitHub, which should be merged soon and be released with 8.7.x

Also the 2nd error you encountered, could possibly be an effect of the 1st error in which the registry /state was not updated appropriately. You can fix this by deleting the registry/state files, which should be under the same directory in which you have your filebeat binary.