I want to ouput my apm server to kafka. according to offical document have provided. Configed my apm-server.yml in output kafka section.
output.kafka:
hosts: ["localhost:9092"]
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
start apm-server ,but found the error msg in apm-server
Dropping event: no topic could be selected.
It seemed that the filebeat in apm-server can not find fields.log_topic msg.
change apm-server.yml output.kafka section
topic: '%{[log_topic]}'
restart apm-server
the error msg still exist.
Dropping event: no topic could be selected
Apm-server Events for example metric
{
"@timestamp": "2019-12-29T07:50:11.704Z",
"@metadata": {
"beat": "apm-server",
"type": "_doc",
"version": "7.3.1",
"pipeline": "apm"
},
"system": {
"memory": {
"actual": {
"free": 9.9181379584e+10
},
"total": 1.34908739584e+11
},
"process": {
"cpu": {
"total": {
"norm": {
"pct": 0
}
}
},
"memory": {
"size": 1.6571547648e+10
}
},
"cpu": {
"total": {
"norm": {
"pct": 0
}
}
}
},
"processor": {
"name": "metric",
"event": "metric"
},
"service": {
"runtime": {
"name": "Java",
"version": "1.8.0_222"
},
"name": "demo2-employee-service",
"language": {
"name": "Java",
"version": "1.8.0_222"
}
},
"agent": {
"name": "java",
"version": "1.9.0",
"ephemeral_id": "66172de9-40e5-43eb-b62b-127c6d3cde19"
},
"host": {
"hostname": "localhost.localdomain",
"architecture": "amd64",
"os": {
"platform": "Linux"
},
"ip": "218.94.19.122"
},
"ecs": {
"version": "1.0.1"
},
"jvm": {
"gc": {
"alloc": 1.62079504e+08
},
"memory": {
"heap": {
"committed": 2.147483648e+09,
"used": 1.31290632e+08,
"max": 2.147483648e+09
},
"non_heap": {
"max": -1,
"used": 1.14382576e+08,
"committed": 1.18685696e+08
}
},
"thread": {
"count": 41
}
},
"observer": {
"id": "1b777420-fcbe-4dba-bb53-50e67f2600be",
"ephemeral_id": "df756826-54ad-4b1d-9f79-164b73598091",
"type": "apm-server",
"hostname": "localhost.localdomain",
"version": "7.3.1",
"version_major": 7
},
"process": {
"pid": 19600,
"title": "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-1.el7_7.x86_64/jre/bin/java"
}
}
I wanna create topic according to processor.event ,as sample above the topic will
be metric,so I change my apm-server.yml configuration as
topic: '%{[processor.event]}'
restart my apm-server ,but i find the create topic is onboarding.
...
change apm-server.yml log level to debug
find the msg as followed
2019-12-29T15:40:58.822+0800 INFO [onboarding] beater/onboarding.go:31 Publishing onboarding document
2019-12-29T15:40:58.822+0800 DEBUG [processors] processing/processors.go:183 Publish event: {
"@timestamp": "2019-12-29T07:40:58.822Z",
"@metadata": {
"beat": "apm-server",
"type": "_doc",
"version": "7.3.1",
"pipeline": "apm"
},
"processor": {
"event": "onboarding",
"name": "onboarding"
},
"observer": {
"listening": "218.94.19.122:18200",
"id": "1b777420-fcbe-4dba-bb53-50e67f2600be",
"ephemeral_id": "2761fdb3-871e-4404-ad12-4512aecfbedb",
"type": "apm-server",
"hostname": "localhost.localdomain",
"version": "7.3.1",
"version_major": 7
},
"ecs": {
"version": "1.0.1"
}
}
2019-12-29T15:40:59.823+0800 INFO pipeline/output.go:95 Connecting to kafka(localhost:9092)
2019-12-29T15:40:59.823+0800 DEBUG [kafka] kafka/client.go:93 connect: [localhost:9092]
2019-12-29T15:40:59.823+0800 INFO kafka/log.go:53 kafka message: Initializing new client
2019-12-29T15:40:59.823+0800 INFO kafka/log.go:53 kafka message: Successfully initialized new client
2019-12-29T15:40:59.823+0800 INFO pipeline/output.go:105 Connection to kafka(localhost:9092) established
2019-12-29T15:40:59.824+0800 INFO kafka/log.go:53 client/metadata fetching metadata for [onboarding] from broker localhost:9092
2019-12-29T15:40:59.828+0800 INFO kafka/log.go:53 Connected to broker at localhost:9092 (unregistered)
2019-12-29T15:40:59.832+0800 INFO kafka/log.go:53 client/brokers registered new broker #0 at localhost:9092
2019-12-29T15:40:59.832+0800 INFO kafka/log.go:53 producer/broker/0 starting up
2019-12-29T15:40:59.832+0800 INFO kafka/log.go:53 producer/broker/0 state change to [open] on onboarding/0
So the topic filter the event fields is filebeat events?
Actually I wanna filter my apm-server accoring to apm-server Events Such as metric,error,transaction,span..So Any suggestion to config the topic?
I change to topic configuration as followed:
topic: 'logs-%{[beat.version]}'
topics:
- topic: sourcemap
when.contains:
processor.event: "sourcemap"
- topic: error
when.contains:
processor.event: "error"
- topic: transaction
when.contains:
processor.event: "transaction"
- topic: span
when.contains:
processor.event: "span"
- topic: metric
when.contains:
processor.event: "metric"
- topic: onboarding
when.contains:
processor.event: "onboarding"
but the topic is still onboarding. here is debug log
2019-12-30T10:01:03.773+0800 DEBUG [processors] processing/processors.go:183 Publish event: {
"@timestamp": "2019-12-30T02:00:53.767Z",
"@metadata": {
"beat": "apm-server",
"type": "_doc",
"version": "7.3.1",
"pipeline": "apm",
"topic": "onboarding",
"partition": 0
},
"ecs": {
"version": "1.0.1"
},
"processor": {
"name": "metric",
"event": "metric"
},
"host": {
"hostname": "localhost.localdomain",
"architecture": "amd64",
"os": {
"platform": "Linux"
},
"ip": "218.94.19.122"
},
"labels": {
"name": "G1 Young Generation"
},
"observer": {
"type": "apm-server",
"hostname": "localhost.localdomain",
"version": "7.3.1",
"version_major": 7,
"id": "1b777420-fcbe-4dba-bb53-50e67f2600be",
"ephemeral_id": "7b445a73-adf8-4dfb-b2d1-07015c18e40f"
},
"jvm": {
"gc": {
"count": 26,
"time": 1057
}
},
"service": {
"language": {
"name": "Java",
"version": "1.8.0_222"
},
"runtime": {
"name": "Java",
"version": "1.8.0_222"
},
"name": "dem2-config-server-service"
},
"agent": {
"name": "java",
"version": "1.9.0",
"ephemeral_id": "761c5477-ed6a-473e-a0d4-02f6bda633f5"
},
"process": {
"title": "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-1.el7_7.x86_64/jre/bin/java",
"pid": 17208
}
}
2019-12-30T10:01:04.781+0800 DEBUG [kafka] kafka/client.go:264 finished kafka batch
2019-12-30T10:01:04.781+0800 DEBUG [publisher] memqueue/ackloop.go:160 ackloop: receive ack [49: 0, 3]