Hi, i have several ingest pipelines that has quite large processor configured in it. Each pipeline for each Data Stream. For example, pipeline "2g_names" works with "raw_kpi_2g_" (raw_kpi_2g_1) Data Stream, "3g_names" for "raw_kpi_3g_" (raw_kpi_3g_1, raw_kpi_3g_2, raw_kpi_3g_3 & raw_kpi_3g_4) and "4g_names" works with "raw_kpi_4g_*" (raw_kpi_4g_1, raw_kpi_4g_2, raw_kpi_4g_3, raw_kpi_4g_4 & raw_kpi_4g_5).
The Elasticsearch version is 7.12.1
This is one of the pipelines ("3g_names"):
{
"3g_names" : {
"description" : "Enrich 3G data with names",
"processors" : [
{
"enrich" : {
"field" : "RNC.dn",
"policy_name" : "mo_name",
"target_field" : "RNC",
"ignore_missing" : true,
"ignore_failure" : true
}
},
{
"enrich" : {
"field" : "WBTS.dn",
"policy_name" : "mo_name",
"target_field" : "WBTS",
"ignore_missing" : true,
"ignore_failure" : true
}
},
{
"enrich" : {
"field" : "WCEL.dn",
"policy_name" : "mo_name",
"target_field" : "WCEL",
"ignore_missing" : true,
"ignore_failure" : true
}
},
{
"enrich" : {
"field" : "WBTS.name",
"policy_name" : "location_data",
"target_field" : "location",
"ignore_missing" : true,
"ignore_failure" : true
}
},
{
"set" : {
"field" : "location.name",
"value" : "{{location.shortName}}",
"ignore_failure" : true
}
},
{
"remove" : {
"field" : "location.shortName",
"ignore_missing" : true,
"ignore_failure" : true
}
},
{
"enrich" : {
"field" : "location.name",
"policy_name" : "tech_correlation",
"target_field" : "general",
"ignore_missing" : true,
"ignore_failure" : true
}
},
{
"enrich" : {
"field" : "location.name",
"policy_name" : "jefaturas_data_v2",
"target_field" : "jefaturas",
"ignore_missing" : true,
"ignore_failure" : true
}
},
{
"enrich" : {
"field" : "WBTS.name",
"policy_name" : "site_name",
"target_field" : "temp",
"ignore_missing" : true,
"ignore_failure" : true
}
},
{
"set" : {
"field" : "location.siteName",
"value" : "{{temp.siteName}}",
"ignore_empty_value" : true,
"ignore_failure" : true
}
},
{
"remove" : {
"field" : "temp",
"ignore_missing" : true,
"ignore_failure" : true
}
}
]
}
}
The pipeline actually works when i simulate, but when i send a large bulk request to index data, it fails with following error:
{
"processor_results":[
{
"processor_type":"enrich",
"status":"error_ignored",
"ignored_error":{
"error":{
"root_cause":[
{
"type":"es_rejected_execution_exception",
"reason":"Could not perform enrichment, enrich coordination queue at capacity [1024/1024]"
}
],
"type":"es_rejected_execution_exception",
"reason":"Could not perform enrichment, enrich coordination queue at capacity [1024/1024]"
}
}
}
]
}
Here are some statistics of the node (we have only 1) of the last 7 days:
Here are some statistics one of the behind index (writing index) of one DataStream:
I couldn't get the indexing rate of each data stream, if someone can tell me how to make the query, I will be happy to provide that information.
What approach could be taken to this situation in order to solve this problem?
Regards.