Hi all,
I am using filebeat with ingest pipelines and I suspect that one of them stalls the ingest node from time to time.
When the pipeline runs under load, the bulk threadpool fills up and starts rejecting requests.
I see EsRejectedExecutionException in the log, a maxed queue (500) and the "completed" value in the bulk thread doesn't change anymore:
GET /_nodes/stats/thread_pool
(..)
"thread_pool": {
"bulk": {
"threads": 12,
"queue": 500,
"active": 12,
"rejected": 6063,
"largest": 12,
"completed": 34109
},
All 12 CPU cores are stuck at 100% and tasks won't finish. The value of "running_time" keeps going up:
GET _tasks?pretty
(..)
"tasks": {
"nBBzTCUCR3SmV-NRfhGmvQ:57344": {
"node": "nBBzTCUCR3SmV-NRfhGmvQ",
"id": 57344,
"type": "transport",
"action": "indices:data/write/bulk",
"start_time_in_millis": 1515672022666,
"running_time_in_nanos": 12585319138455,
"cancellable": false
},
As far as I understand it, all threads are used by the same pipeline:
GET /_nodes/stats/ingest
(..)
"ingest": {
"total": {
"count": ..,
"time_in_millis": 6046,
"current": 12, <========== total
"failed": 0
},
..
"ait_appng-log-pipeline": {
"count": ..,
"time_in_millis": 438,
"current": 12, <========== the only pipeline that has a "current" value greater zero
"failed": 0
},
No more messages will be indexed, until I restart the node.
My EL version is 5.6.5 with lucene 6.6.1.
I'm not sure if my assumption is correct, that the problem is caused by the pipeline. But if so, is there a way to dig deeper, to find out what the ingest node is actually doing and why it blocks?
This is the pipeline:
GET _ingest/pipeline/ait_appng-log-pipeline
{
"ait_appng-log-pipeline": {
"processors": [
{
"grok": {
"field": "message",
"ignore_missing": true,
"pattern_definitions": {
"SESSION_ID": "[0-9A-F]{32}",
"UNIQUE_ID": "(?:[0-9A-Fa-f_]+:){3}[0-9A-Fa-f_]+",
"APPNG_USER": "(?:%{EMAILADDRESS}|%{USERNAME})",
"GREEDYMULTILINE": "(.|\n)*"
},
"patterns": [
"""^%{TIMESTAMP_ISO8601:appng.core.log.timestamp}%{SPACE}%{LOGLEVEL:appng.core.log.loglevel}%{SPACE}\[%{DATA:appng.core.log.thread}\] \[(?:%{UNIQUE_ID:haproxy.unique_id})? (?:%{SESSION_ID:appng.core.log.session_id})? (?:%{APPNG_USER:appng.core.log.username})?@(?:%{WORD:appng.core.log.locale})? (?:%{URIPROTO:appng.core.log.ffor_proto})? (?:%{PATH:appng.core.log.path})?\] %{DATA:appng.core.log.category}: %{GREEDYMULTILINE:appng.core.log.message1}"""
]
}
},
{
"grok": {
"field": "appng.core.log.message1",
"ignore_missing": true,
"ignore_failure": true,
"pattern_definitions": {
"JAVA_EXCEPTION": """(?:[a-zA-Z0-9_-]+\.)+[a-zA-Z0-9_-]*Exception\b"""
},
"patterns": [
"\\n%{JAVA_EXCEPTION:appng.core.log.exception}"
]
}
},
{
"remove": {
"field": "appng.core.log.message1",
"ignore_failure": true
}
},
{
"date": {
"field": "appng.core.log.timestamp",
"target_field": "@timestamp",
"formats": [
"YYYY-MM-dd HH:mm:ss,SSS"
],
"timezone": "Europe/Berlin"
}
},
{
"remove": {
"field": "appng.core.log.timestamp",
"ignore_failure": true
}
}
],
"on_failure": [
{
"set": {
"field": "error",
"value": "{{ _ingest.on_failure_message }}"
}
}
]
}
}
Best regards,
Dirk