Hi gareth-ellis,
Thank you for the response, its not a production cluster though, but clients are using it, but thats okay we are testing it in different unused cluster.
And the race performed its for just 60k & as per this blog How to benchmark Elasticsearch performance with ingest pipelines and your own logs | Elastic Blog to add a custom pipeline I was getting syntax error because I am using script in my pipeline but single rename processors worked fine.
{"processors":[{"rename":{"ignore_failure":true,"field":"_raw","target_field":"Event"}},{"set":{"field":"ingestion_time","value":"{{{_ingest.timestamp}}}","on_failure":[{"set":{"field":"ingest_time_failure","value":true}}]}},{"grok":{"ignore_missing":true,"field":"Event","patterns":["%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{GREEDYDATA:rest}%{SPACE}%{SPACE}DB :\\s*(?<sql_query>\\{.*\\})","%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{GREEDYDATA:rest}%{SPACE}org\\.hibernate\\.SQL\\s+-\\s*(?m)(?<sql_query>.*)","%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{GREEDYDATA:rest}%{SPACE}org\\.hibernate\\.SQL\\s+-\\s*(?<sql_query>.*)","%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{GREEDYDATA:rest}o\\.s\\.jdbc\\.core\\.JdbcTemplate - Executing prepared SQL statement \\[%{GREEDYDATA:sql_query}\\]","%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{GREEDYDATA:rest}"],"on_failure":[{"set":{"field":"grokError1","value":true}}]}},{"script":{"source":"\r // Initialize variables\r List brace_stack = new ArrayList();\r List nested_objects = new ArrayList();\r List chunks = new ArrayList();\r String current_chunk = \"\";\r String log_message = ctx.rest; // Assuming `event.rest` holds the log message\r \t\t List allowedFields = ['Error', 'app_name', 'app_env', 'transactionCount','action', 'client-id', 'execTime', 'status', 'returnCode', 'logType', 'message', 'source', 'paymentSubType', 'divisionId', '_time', 'requestId', 'validateTransactionId', 'authTransactionId', 'batchType','notificationType', 'subBatchId', 'itemType', 'bpId', 'paymentTokenId', 'mopType', 'paymentDate', 'paymentType', 'itemId', 'itemAmount', 'accountNumber', 'amount', 'actionCode', 'cpsTransactionId', 'cpsTransactionStatus', 'sourceId', 'adjAmount', 'adjType', 'adjReason', 'exceptionType', 'originalDepositTransactionId', 'billerAccountNumber', 'cardType', 'accountType', 'cardHolderName', 'accountHolderName', 'terminal_id', 'spcDivisionNumber', 'statementCode', 'paymentMethodType','transaction_id', 'transaction-id', 'SPSTransactionId', 'contentFilename', 'user_id', 'biller_account_number', 'payload', 'billingAccountNo', 'reasonCode', 'reasonText', 'sourceBiller', 'adjustmentType', 'file_remoteFile', 'itemDueDate', 'hostname', 'error_1', 'batchId', 'lastFourNumbersOfBankAccount', 'invalidReasonScenario', 'STPFileName', 'providerResponseCode', 'providerMessage', 'CSGFileCount', 'spsBatchId', 'OtpPrepTotal', 'referenceId', 'description', 'adjustmentReasonCode', 'adjustmentCode', 'systemErrorCode', 'spmId', 'messageindex', 'actionindex', 'providerMessageindex', 'transactionResponseStatus', 'templateId', 'Errorindex', 'notificationSeq'];\r List doubleFields = [\"amount\",\"transactionCount\"];\r \r Set allowed = new HashSet();\r for (def field : allowedFields) {\r allowed.add(field);\r }\r \r // Process the log message character by character\r for (int pos = 0; pos < log_message.length(); pos++) {\r String chr = log_message.substring(pos, pos + 1);\r \r // If space is encountered, process the current chunk\r if (chr == \" \") {\r if (current_chunk.contains(\"=\")) {\r if (current_chunk.length() > 1) {\r // Clean up the chunk\r current_chunk = /[^a-zA-Z0-9\\\\s]+$/.matcher(current_chunk).replaceAll(\"\");\r \r String[] result = current_chunk.splitOnToken(\"=\", 2);\r if (result.length == 2 && result[0] != null && !result[0].isEmpty() && !result[0].trim().isEmpty() && allowed.contains(result[0])) {\r result[1] = result[1] == \"\" ? \"null\" : result[1];\r \t\t\t\t\t\t\t \r \t\t\t\t\t\t\t \r \t\t\t\t\t\t\t \r if (result[0] == \"execTime\" && result[1] != null) {\r result[1] = result[1].replace(\"ms\", \"\");\r int value = Integer.parseInt(result[1]);\r \r }\r \t\t\t\t\t\t\t \r \t\t\t\t\t\t\t if (result[0].contains(\".\")) {\r \t\t\t\t\t\t\t result[0] = result[0].replace(\".\", \"_\");\r \t\t\t\t\t\t\t }\r \r if (result[0] == \"host\" && result[1] != null) {\r \t\t\t\t\t\t\t\t result[0] = \"app_host\";\r }\r \t\t\t\t\t\t\t if (result[0] == \"eft\" && result[1] == null) {\r \t\t\t\t\t\t\t\t result[0] = \"app_eft\";\r }\r \t\t\t\t\t\t\t if (result[0] == \"assignedTo\" && result[1] == null) {\r \t\t\t\t\t\t\t\t result[0] = \"app_assignedTo\";\r }\t\t\t\t\t\t\t \r \t\t\t\t\t\t\t if (result[0] == \"pending\" && (result[1] == \"true\" || result[1] == \"false\")) {\r \t\t\t\t\t\t\t\t result[0] = \"app_pending\";\r } \t\t\t\t\t\t\t \r \r if ((result[0].startsWith(\"http\") || result[0].startsWith(\"https\")) && result[0].contains(\"?\")) {\r \r \t\t\t\t\t\t\t\tString[] httpValue = result[0].splitOnToken(\"?\",2);\r \t\t\t\t\t\t\t\tresult[0]=httpValue[1];\r }\r \t\t\t\t\t\t\t \r \t\t\t\t\t\t\t if (!ctx.containsKey(result[0])) {\r if (doubleFields.contains(result[0])) {\r try {\r if (result[1] == null || result[1] == \"null\" || result[1].trim() == \"\") {\r ctx[result[0]] = 0.0;\r } else {\r ctx[result[0]] = Double.parseDouble(result[1].toString());\r }\r } catch (Exception e) {\r ctx[result[0]] = 0.0;\r }\r } else {\r ctx[result[0]] = result[1];\r }\r \t\t\t\t\t\t\t }\r }\r }\r }\r \t\t\t\t \r \t\t\t\t if (!current_chunk.contains(\"=\") && chunks.size() > 0) \r {\r String new_chunk = chunks.remove(chunks.size() - 1) + \" \" + current_chunk.trim();\r current_chunk = new_chunk;\r \t\t\t\t\t if (!current_chunk.contains(\"logType=\") && !current_chunk.contains(\"execTime=\") && !current_chunk.contains(\"?\") && !current_chunk.contains(\"spsBatchId=\") && !current_chunk.contains(\"app_name=\") && !current_chunk.contains(\"batchId=\") && !current_chunk.contains(\"action=\")) {\r current_chunk = /[^a-zA-Z0-9\\\\s]+$/.matcher(current_chunk).replaceAll(\"\");\r String[] result = current_chunk.splitOnToken(\"=\", 2);\r if (result.length == 2 && result[0] != null && !result[0].isEmpty() && !result[0].trim().isEmpty() && allowed.contains(result[0])) {\r if (doubleFields.contains(result[0])) {\r try {\r if (result[1] == null || result[1] == \"null\" || result[1].trim() == \"\") {\r ctx[result[0]] = 0.0;\r } else {\r ctx[result[0]] = Double.parseDouble(result[1].toString());\r }\r } catch (Exception e) {\r ctx[result[0]] = 0.0;\r }\r } else {\r ctx[result[0].trim()] = result[1];\r }\r \t\t\t\t\t }\r \t\t\t\t\t }\r }\r \t\t\t\t \r if (!current_chunk.isEmpty()) {\r chunks.add(current_chunk); // Add the chunk to the list\r }\r current_chunk = \"\"; // Reset for next chunk\r } else {\r current_chunk += chr; // Accumulate character to current chunk\r }\r \r // Handle opening braces\r if (chr == \"[\" || chr == \"(\" || chr == \"{\" || chr == \"<\") {\r if (current_chunk.contains(\"=\") && !current_chunk.contains(\",\")) {\r brace_stack.add([ \"brace\": chr, \"position\": pos, \"valid\": true, \"current_chunk\": current_chunk ]);\r current_chunk = \"\"; // Reset current chunk\r } else {\r brace_stack.add([ \"brace\": chr, \"position\": pos, \"valid\": false ]);\r current_chunk = \"\"; // Reset current chunk\r }\r }\r \r // Handle closing braces\r else if (chr == \"]\" || chr == \")\" || chr == \"}\" || chr == \">\") {\r if (!brace_stack.isEmpty()) {\r Map start = brace_stack.remove(brace_stack.size() - 1); // Pop the last element from the stack\r Map nested_object = new HashMap();\r nested_object.put(\"start_brace\", start.get(\"brace\"));\r nested_object.put(\"start_position\", start.get(\"position\"));\r nested_object.put(\"valid_key\", start.get(\"valid\"));\r nested_object.put(\"current_chunk\", start.get(\"current_chunk\"));\r nested_object.put(\"end_brace\", chr);\r nested_object.put(\"end_position\", pos);\r \r nested_objects.add(nested_object); // Add the nested object to the list\r }\r }\r }\r \t\t \r \t\t if (!current_chunk.isEmpty() && !current_chunk.startsWith(\"http\"))\r \t\t {\r if (current_chunk.contains(\"}}\")) \r \t\t\t {\r String[] result1 = current_chunk.splitOnToken(\"}}\", 2);\r \r \t\t\t\t for (int i = 0; i < result1.length; i++)\r \t\t\t\t {\r \t\t\t\t\t if (result1.length > 0 && result1[i].contains(\"=\") && !result1[i].contains(\":\")) \r \t\t\t\t {\r String chunk = /[^a-zA-Z0-9\\\\s]+$/.matcher(result1[i]).replaceAll(\"\");\r String[] result = chunk.splitOnToken(\"=\", 2);\r if (result.length == 2 && result[0] != null && !result[0].isEmpty() && !result[0].trim().isEmpty() && allowed.contains(result[0]))\r \t\t\t\t\t {\r \t\t\t\t\t\t\t if (result[0].contains(\".\")) {\r \t\t\t\t\t\t\t result[0] = result[0].replace(\".\", \"_\");\r \t\t\t\t\t\t\t }\r \t\t\t\t\t\t\t \r result[1] = result[1] == \"\" ? \"null\" : result[1];\r String key = result[0].trim().replace(\"}\", \"\");\r \t\t\t\t\t\t\t\t \r if (doubleFields.contains(key)) {\r try {\r if (result[1] == null || result[1] == \"null\" || result[1].trim() == \"\") {\r ctx[key] = 0.0;\r } else {\r ctx[key] = Double.parseDouble(result[1].toString());\r }\r } catch (Exception e) {\r ctx[key] = 0.0;\r }\r } else {\r ctx[key.trim()] = result[1];\r }\r }\r } \r \t\t\t\t }\t \r \t\t } \r \t\t\t else if (current_chunk.contains(\"=\")) \r \t\t\t {\r // If no \"}}\", process current_chunk normally\r if (current_chunk.length() > 1) \r \t\t\t\t {\r current_chunk = /[^a-zA-Z0-9\\\\s]+$/.matcher(current_chunk).replaceAll(\"\");\r String[] result = current_chunk.splitOnToken(\"=\", 2);\r if (result.length == 2 && result[0] != null && !result[0].isEmpty() && !result[0].trim().isEmpty() && allowed.contains(result[0])) \r \t\t\t\t\t {\r \t\t\t\t\t \t\tif (result[0].contains(\".\")) {\r \t\t\t\t\t\t\t result[0] = result[0].replace(\".\", \"_\");\r \t\t\t\t\t\t\t }\r result[1] = result[1] == \"\" ? \"null\" : result[1];\r \r if (doubleFields.contains(result[0])) {\r try {\r if (result[1] == null || result[1] == \"null\" || result[1].trim() == \"\") {\r ctx[result[0]] = 0.0;\r } else {\r ctx[result[0]] = Double.parseDouble(result[1].toString());\r }\r } catch (Exception e) {\r ctx[result[0]] = 0.0;\r }\r } else {\r ctx[result[0].trim()] = result[1];\r }\r }\r }\r }\r }\r \r \r // Process the nested objects to extract them as key-value pair\r List segments = new ArrayList();\r for (Map obj : nested_objects) {\r if ((boolean) obj.get(\"valid_key\")) {\r String segment = log_message.substring((int) obj.get(\"start_position\"), (int) obj.get(\"end_position\") + 1);\r segment = obj.get(\"current_chunk\") + segment.substring(1); // Replace the first character with the current chunk\r String[] objResult = segment.splitOnToken(\"=\", 2);\r if (objResult.length == 2 && objResult[0] != null && !objResult[0].isEmpty() && !objResult[0].trim().isEmpty() && allowed.contains(objResult[0])) {\r \t\t\t\t if ((objResult[0].startsWith(\"http\") || objResult[0].startsWith(\"https\")) && objResult[0].contains(\"?\")) {\r \t\t\t\t\t\t\tString[] nestedhttpValue = objResult[0].splitOnToken(\"?\",2);\r \t\t\t\t\t\t\tobjResult[0]=nestedhttpValue[1];\r } else {\r \t\t\t\t\t\t \tif (!ctx.containsKey(objResult[0])) {\r \t\t\t\t\t\t\t\r \t\t\t\t\t\t\tif (objResult[0].contains(\".\")) {\r \t\t\t\t\t\t\t\t objResult[0] = objResult[0].replace(\".\", \"_\");\r }\r if (doubleFields.contains(objResult[0])) {\r try {\r if (objResult[1] == null || objResult[1] == \"null\" || objResult[1].trim() == \"\") {\r ctx[objResult[0]] = 0.0;\r } else {\r ctx[objResult[0]] = Double.parseDouble(objResult[1].toString());\r }\r } catch (Exception e) {\r ctx[objResult[0]] = 0.0;\r }\r } else {\r ctx[objResult[0].trim()] = objResult[1];\r }\r \t\t\t\t\t\t\t\t\r \t\t\t\t\t\t\t\t\r \t\t\t\t\t\t\t\t// Set the key-value pair in the event\r \t\t\t\t\t\t\t}\r \t\t\t\t\t\t }\r }\r Map segment_info = new HashMap();\r segment_info.put(\"nested_object\", \"Start nested object Brace: \" + obj.get(\"start_brace\") + \", Position: \" + obj.get(\"start_position\") + \" & End nested object Brace: \" + obj.get(\"end_brace\") + \", Position: \" + obj.get(\"end_position\"));\r segment_info.put(\"log_segment\", segment);\r segments.add(segment_info); // Store the segment information\r }\r }\r \r // If necessary, set the results back in the event\r // ctx.event.put(\"nested_objects\", segments); // Uncomment this if you need to return segments\r // ctx.event.put(\"current_chunk\", chunks); // Uncomment this if you need to return chunks\r ","lang":"painless","on_failure":[{"set":{"field":"scriptError","value":true}}]}},{"rename":{"ignore_missing":true,"field":"service","target_field":"app_service"}},{"rename":{"ignore_missing":true,"field":"source","target_field":"app_source"}},{"rename":{"ignore_missing":true,"field":"user","target_field":"app_user"}},{"gsub":{"pattern":"ms","ignore_missing":true,"field":"execTime","replacement":""}},{"convert":{"ignore_missing":true,"field":"execTime","type":"long"}},{"rename":{"ignore_missing":true,"field":"error","target_field":"app_error"}},{"rename":{"ignore_missing":true,"field":"file","target_field":"app_file"}},{"rename":{"ignore_missing":true,"field":"url","target_field":"app_url"}},{"rename":{"ignore_missing":true,"field":"metadata","target_field":"app_metadata"}},{"remove":{"field":"rest","on_failure":[{"set":{"field":"rest_remove_failure","value":true}}]}},{"rename":{"field":"transaction-id","target_field":"transaction_id","ignore_missing":true}}]}
And just mentioning the pipeline under challenge default.json
{
"name": "my-challenge",
"description": "My new challenge",
"default": true,
"schedule": [
{
"operation": "delete-index"
},
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},
{
"operation": {
"operation-type": "bulk",
"pipeline": "test-sps-pipeline",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
"clients": {{bulk_indexing_clients | default(8)}},
"iterations": 106397,
"target-throughput": 1230
}
]
}
And this code was supposed to iterate the ingestion for 60k records, means keep on ingesting 60k records until all iterations are completed thats what my assumptions is, but its not, its indexing only once (appr 58k records).
so if I create track with 1.6b records index & add parallel task to run for 24hrs will it index docs to cluster until 24 hrs & benchmark the cluster.
track.json file
{
"version": 2,
"description": "Tracker-generated track for test-sps-track",
"indices": [
{
"name": "rally_sps_logs"
}
],
"corpora": [
{
"name": ".ds-demo_sps_application_logs-8.17.1-2025.07.19-000002",
"documents": [
{
"target-index": "rally_sps_logs",
"source-file": "sps_logs.json.bz2",
"document-count": 60001,
"pipeline": "test-sps-pipeline"
}
]
}
],
"operations": [{{ rally.collect(parts="operations/*.json") }}],
"challenges": [{{ rally.collect(parts="challenges/*.json") }}]
}
And i tried to use the pipeline in cluster itself instead of creating from esrally but its throwing script error.
With Regards,
Abhishek M