How to config APM output to kafka with topic name using event's processor event name

I want to ouput my apm server to kafka. according to offical document have provided. Configed my apm-server.yml in output kafka section.

output.kafka:
hosts: ["localhost:9092"]
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false

required_acks: 1
compression: gzip
max_message_bytes: 1000000

start apm-server ,but found the error msg in apm-server

Dropping event: no topic could be selected.

It seemed that the filebeat in apm-server can not find fields.log_topic msg.


change apm-server.yml output.kafka section
topic: '%{[log_topic]}'
restart apm-server
the error msg still exist.
Dropping event: no topic could be selected


Apm-server Events for example metric

{
"@timestamp": "2019-12-29T07:50:11.704Z",
"@metadata": {
"beat": "apm-server",
"type": "_doc",
"version": "7.3.1",
"pipeline": "apm"
},
"system": {
"memory": {
"actual": {
"free": 9.9181379584e+10
},
"total": 1.34908739584e+11
},
"process": {
"cpu": {
"total": {
"norm": {
"pct": 0
}
}
},
"memory": {
"size": 1.6571547648e+10
}
},
"cpu": {
"total": {
"norm": {
"pct": 0
}
}
}
},
"processor": {
"name": "metric",
"event": "metric"
},
"service": {
"runtime": {
"name": "Java",
"version": "1.8.0_222"
},
"name": "demo2-employee-service",
"language": {
"name": "Java",
"version": "1.8.0_222"
}
},
"agent": {
"name": "java",
"version": "1.9.0",
"ephemeral_id": "66172de9-40e5-43eb-b62b-127c6d3cde19"
},
"host": {
"hostname": "localhost.localdomain",
"architecture": "amd64",
"os": {
"platform": "Linux"
},
"ip": "218.94.19.122"
},
"ecs": {
"version": "1.0.1"
},
"jvm": {
"gc": {
"alloc": 1.62079504e+08
},
"memory": {
"heap": {
"committed": 2.147483648e+09,
"used": 1.31290632e+08,
"max": 2.147483648e+09
},
"non_heap": {
"max": -1,
"used": 1.14382576e+08,
"committed": 1.18685696e+08
}
},
"thread": {
"count": 41
}
},
"observer": {
"id": "1b777420-fcbe-4dba-bb53-50e67f2600be",
"ephemeral_id": "df756826-54ad-4b1d-9f79-164b73598091",
"type": "apm-server",
"hostname": "localhost.localdomain",
"version": "7.3.1",
"version_major": 7
},
"process": {
"pid": 19600,
"title": "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-1.el7_7.x86_64/jre/bin/java"
}
}

I wanna create topic according to processor.event ,as sample above the topic will
be metric,so I change my apm-server.yml configuration as

topic: '%{[processor.event]}'
restart my apm-server ,but i find the create topic is onboarding.
...
change apm-server.yml log level to debug
find the msg as followed
2019-12-29T15:40:58.822+0800 INFO [onboarding] beater/onboarding.go:31 Publishing onboarding document
2019-12-29T15:40:58.822+0800 DEBUG [processors] processing/processors.go:183 Publish event: {
"@timestamp": "2019-12-29T07:40:58.822Z",
"@metadata": {
"beat": "apm-server",
"type": "_doc",
"version": "7.3.1",
"pipeline": "apm"
},
"processor": {
"event": "onboarding",
"name": "onboarding"
},
"observer": {
"listening": "218.94.19.122:18200",
"id": "1b777420-fcbe-4dba-bb53-50e67f2600be",
"ephemeral_id": "2761fdb3-871e-4404-ad12-4512aecfbedb",
"type": "apm-server",
"hostname": "localhost.localdomain",
"version": "7.3.1",
"version_major": 7
},
"ecs": {
"version": "1.0.1"
}
}
2019-12-29T15:40:59.823+0800 INFO pipeline/output.go:95 Connecting to kafka(localhost:9092)
2019-12-29T15:40:59.823+0800 DEBUG [kafka] kafka/client.go:93 connect: [localhost:9092]
2019-12-29T15:40:59.823+0800 INFO kafka/log.go:53 kafka message: Initializing new client
2019-12-29T15:40:59.823+0800 INFO kafka/log.go:53 kafka message: Successfully initialized new client
2019-12-29T15:40:59.823+0800 INFO pipeline/output.go:105 Connection to kafka(localhost:9092) established
2019-12-29T15:40:59.824+0800 INFO kafka/log.go:53 client/metadata fetching metadata for [onboarding] from broker localhost:9092

2019-12-29T15:40:59.828+0800 INFO kafka/log.go:53 Connected to broker at localhost:9092 (unregistered)

2019-12-29T15:40:59.832+0800 INFO kafka/log.go:53 client/brokers registered new broker #0 at localhost:9092
2019-12-29T15:40:59.832+0800 INFO kafka/log.go:53 producer/broker/0 starting up

2019-12-29T15:40:59.832+0800 INFO kafka/log.go:53 producer/broker/0 state change to [open] on onboarding/0

So the topic filter the event fields is filebeat events?
Actually I wanna filter my apm-server accoring to apm-server Events Such as metric,error,transaction,span..So Any suggestion to config the topic?


I change to topic configuration as followed:
topic: 'logs-%{[beat.version]}'
topics:
- topic: sourcemap
when.contains:
processor.event: "sourcemap"
- topic: error
when.contains:
processor.event: "error"
- topic: transaction
when.contains:
processor.event: "transaction"
- topic: span
when.contains:
processor.event: "span"
- topic: metric
when.contains:
processor.event: "metric"
- topic: onboarding
when.contains:
processor.event: "onboarding"
but the topic is still onboarding. here is debug log


2019-12-30T10:01:03.773+0800 DEBUG [processors] processing/processors.go:183 Publish event: {
"@timestamp": "2019-12-30T02:00:53.767Z",
"@metadata": {
"beat": "apm-server",
"type": "_doc",
"version": "7.3.1",
"pipeline": "apm",
"topic": "onboarding",
"partition": 0
},
"ecs": {
"version": "1.0.1"
},
"processor": {
"name": "metric",
"event": "metric"
},
"host": {
"hostname": "localhost.localdomain",
"architecture": "amd64",
"os": {
"platform": "Linux"
},
"ip": "218.94.19.122"
},
"labels": {
"name": "G1 Young Generation"
},
"observer": {
"type": "apm-server",
"hostname": "localhost.localdomain",
"version": "7.3.1",
"version_major": 7,
"id": "1b777420-fcbe-4dba-bb53-50e67f2600be",
"ephemeral_id": "7b445a73-adf8-4dfb-b2d1-07015c18e40f"
},
"jvm": {
"gc": {
"count": 26,
"time": 1057
}
},
"service": {
"language": {
"name": "Java",
"version": "1.8.0_222"
},
"runtime": {
"name": "Java",
"version": "1.8.0_222"
},
"name": "dem2-config-server-service"
},
"agent": {
"name": "java",
"version": "1.9.0",
"ephemeral_id": "761c5477-ed6a-473e-a0d4-02f6bda633f5"
},
"process": {
"title": "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-1.el7_7.x86_64/jre/bin/java",
"pid": 17208
}
}
2019-12-30T10:01:04.781+0800 DEBUG [kafka] kafka/client.go:264 finished kafka batch
2019-12-30T10:01:04.781+0800 DEBUG [publisher] memqueue/ackloop.go:160 ackloop: receive ack [49: 0, 3]


As explained in the documentation, this example uses a custom field. This page also refers to how such can be configured.

You may use the topics configuration option, for example:

  topics:
    - topic: "transactions"
      when.contains:
        processor.event: "transaction"
    - topic: "errors"
      when.contains:
        processor.event: "error"

(Didn't try it out, the trial and error is on you :slight_smile: )

thanks for suggestion. :smiley:
you mean for topics configuration.
It just support filter by transaction or error?

Yes, sorry, fixed.

It is only an example. If it works you can do the same for metric and span events .
As I said- I never tried it out, just found it in the documentation and it seems to fit what you are looking for.

This topic was automatically closed 20 days after the last reply. New replies are no longer allowed.