Esrally setup for a day with custom logs & ingest-pipeline for benchmarking the cluster

Hi All,

I am new to ESRALLY setup, as of now I am able to create track & run the race for custom logs & ingest-pipeline, but i need to run the rally for a day on the cluster that is getting an almost 1.6billion records a day. Need to run rally on this live index for a day or more, came to know that can't run rally on an real time index from a cluster. So I need to know about how I can setup ESrally to run for a day so that it can ingest & searches the 1.6 billion records app a day.
So please help me with this setup to schedule rally to run for a day to benchmark the cluster with 1.6billion records per day.

With Regards,
Abhishek M

Hello,

So, we recommend you don't run esrally against a production cluster for a few reasons:

  1. For getting an accurate view of what a cluster is capable of, there should not be anything else going on in the cluster - otherwise there are unknowns that are potentially affecting the cluster performance.
  2. The majority of the tracks that we have in the rally-tracks repo will by default delete all indices that relate to the track - thus if you use one of those tracks as a base, there is a risk that you may inadvertently delete your production indices

As such, we recommend against running rally against a production cluster.

If you're wanting to simulate your production workload, then a track would need creating. You mention you have already run create-track - does this track now contain the corpus of 1.6 billion records? If not, then either create-track should be used, or the data be put into NDJson format and then have rally use this as the corpus.

Once the data is available, you then also need to look at getting the queries that you want your track to be able to run - what these look like will be very much dependent on your individual use case.

Finally, you could setup a parallel task Track Reference - Rally 2.12.0 documentation - and set the parallel task to run for 24 hours

Hi gareth-ellis,

Thank you for the response, its not a production cluster though, but clients are using it, but thats okay we are testing it in different unused cluster.
And the race performed its for just 60k & as per this blog How to benchmark Elasticsearch performance with ingest pipelines and your own logs | Elastic Blog to add a custom pipeline I was getting syntax error because I am using script in my pipeline but single rename processors worked fine.

{"processors":[{"rename":{"ignore_failure":true,"field":"_raw","target_field":"Event"}},{"set":{"field":"ingestion_time","value":"{{{_ingest.timestamp}}}","on_failure":[{"set":{"field":"ingest_time_failure","value":true}}]}},{"grok":{"ignore_missing":true,"field":"Event","patterns":["%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{GREEDYDATA:rest}%{SPACE}%{SPACE}DB :\\s*(?<sql_query>\\{.*\\})","%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{GREEDYDATA:rest}%{SPACE}org\\.hibernate\\.SQL\\s+-\\s*(?m)(?<sql_query>.*)","%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{GREEDYDATA:rest}%{SPACE}org\\.hibernate\\.SQL\\s+-\\s*(?<sql_query>.*)","%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{GREEDYDATA:rest}o\\.s\\.jdbc\\.core\\.JdbcTemplate - Executing prepared SQL statement \\[%{GREEDYDATA:sql_query}\\]","%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{GREEDYDATA:rest}"],"on_failure":[{"set":{"field":"grokError1","value":true}}]}},{"script":{"source":"\r           // Initialize variables\r           List brace_stack = new ArrayList();\r           List nested_objects = new ArrayList();\r           List chunks = new ArrayList();\r           String current_chunk = \"\";\r           String log_message = ctx.rest;  // Assuming `event.rest` holds the log message\r \t\t  List allowedFields = ['Error', 'app_name', 'app_env', 'transactionCount','action', 'client-id', 'execTime', 'status', 'returnCode', 'logType', 'message', 'source', 'paymentSubType', 'divisionId', '_time', 'requestId', 'validateTransactionId', 'authTransactionId', 'batchType','notificationType', 'subBatchId', 'itemType', 'bpId', 'paymentTokenId', 'mopType', 'paymentDate', 'paymentType', 'itemId', 'itemAmount', 'accountNumber', 'amount', 'actionCode', 'cpsTransactionId', 'cpsTransactionStatus', 'sourceId', 'adjAmount', 'adjType', 'adjReason', 'exceptionType', 'originalDepositTransactionId', 'billerAccountNumber', 'cardType', 'accountType', 'cardHolderName', 'accountHolderName', 'terminal_id', 'spcDivisionNumber', 'statementCode', 'paymentMethodType','transaction_id', 'transaction-id', 'SPSTransactionId', 'contentFilename', 'user_id', 'biller_account_number', 'payload', 'billingAccountNo', 'reasonCode', 'reasonText', 'sourceBiller', 'adjustmentType', 'file_remoteFile', 'itemDueDate', 'hostname', 'error_1', 'batchId', 'lastFourNumbersOfBankAccount', 'invalidReasonScenario', 'STPFileName', 'providerResponseCode', 'providerMessage', 'CSGFileCount', 'spsBatchId', 'OtpPrepTotal', 'referenceId', 'description', 'adjustmentReasonCode', 'adjustmentCode', 'systemErrorCode', 'spmId', 'messageindex', 'actionindex', 'providerMessageindex', 'transactionResponseStatus', 'templateId', 'Errorindex', 'notificationSeq'];\r           List doubleFields = [\"amount\",\"transactionCount\"];\r \r           Set allowed = new HashSet();\r           for (def field : allowedFields) {\r             allowed.add(field);\r           }\r \r           // Process the log message character by character\r           for (int pos = 0; pos < log_message.length(); pos++) {\r               String chr = log_message.substring(pos, pos + 1);\r \r               // If space is encountered, process the current chunk\r               if (chr == \" \") {\r                   if (current_chunk.contains(\"=\")) {\r                       if (current_chunk.length() > 1) {\r                           // Clean up the chunk\r                           current_chunk = /[^a-zA-Z0-9\\\\s]+$/.matcher(current_chunk).replaceAll(\"\");\r                           \r                           String[] result = current_chunk.splitOnToken(\"=\", 2);\r                           if (result.length == 2 && result[0] != null && !result[0].isEmpty() && !result[0].trim().isEmpty() && allowed.contains(result[0])) {\r                               result[1] = result[1] == \"\" ? \"null\" : result[1];\r \t\t\t\t\t\t\t  \r \t\t\t\t\t\t\t  \r \t\t\t\t\t\t\t  \r                               if (result[0] == \"execTime\" && result[1] != null) {\r                                   result[1] = result[1].replace(\"ms\", \"\");\r                                   int value = Integer.parseInt(result[1]);\r \r                               }\r \t\t\t\t\t\t\t  \r \t\t\t\t\t\t\t  if (result[0].contains(\".\")) {\r \t\t\t\t\t\t\t     result[0] = result[0].replace(\".\", \"_\");\r \t\t\t\t\t\t\t  }\r \r                              if (result[0] == \"host\" && result[1] != null) {\r \t\t\t\t\t\t\t\t  result[0] = \"app_host\";\r                               }\r \t\t\t\t\t\t\t if (result[0] == \"eft\" && result[1] == null) {\r \t\t\t\t\t\t\t\t  result[0] = \"app_eft\";\r                               }\r \t\t\t\t\t\t\t if (result[0] == \"assignedTo\" && result[1] == null) {\r \t\t\t\t\t\t\t\t  result[0] = \"app_assignedTo\";\r                               }\t\t\t\t\t\t\t  \r \t\t\t\t\t\t\t if (result[0] == \"pending\" && (result[1] == \"true\" || result[1] == \"false\")) {\r \t\t\t\t\t\t\t\t  result[0] = \"app_pending\";\r                               } \t\t\t\t\t\t\t  \r \r                             if ((result[0].startsWith(\"http\") || result[0].startsWith(\"https\")) && result[0].contains(\"?\")) {\r \r \t\t\t\t\t\t\t\tString[] httpValue = result[0].splitOnToken(\"?\",2);\r \t\t\t\t\t\t\t\tresult[0]=httpValue[1];\r                               }\r \t\t\t\t\t\t\t  \r \t\t\t\t\t\t\t  if (!ctx.containsKey(result[0])) {\r                                   if (doubleFields.contains(result[0])) {\r                                       try {\r                                           if (result[1] == null || result[1] == \"null\" || result[1].trim() == \"\") {\r                                               ctx[result[0]] = 0.0;\r                                           } else {\r                                               ctx[result[0]] = Double.parseDouble(result[1].toString());\r                                           }\r                                       } catch (Exception e) {\r                                           ctx[result[0]] = 0.0;\r                                       }\r                                   } else {\r                                       ctx[result[0]] = result[1];\r                                   }\r \t\t\t\t\t\t\t  }\r                           }\r                       }\r                   }\r \t\t\t\t  \r \t\t\t\t  if (!current_chunk.contains(\"=\") && chunks.size() > 0) \r                   {\r                        String new_chunk = chunks.remove(chunks.size() - 1) + \" \" + current_chunk.trim();\r                        current_chunk = new_chunk;\r \t\t\t\t\t   if (!current_chunk.contains(\"logType=\") && !current_chunk.contains(\"execTime=\") && !current_chunk.contains(\"?\") && !current_chunk.contains(\"spsBatchId=\") && !current_chunk.contains(\"app_name=\") && !current_chunk.contains(\"batchId=\") && !current_chunk.contains(\"action=\")) {\r                            current_chunk = /[^a-zA-Z0-9\\\\s]+$/.matcher(current_chunk).replaceAll(\"\");\r                            String[] result = current_chunk.splitOnToken(\"=\", 2);\r                           if (result.length == 2 && result[0] != null && !result[0].isEmpty() && !result[0].trim().isEmpty() && allowed.contains(result[0])) {\r                             if (doubleFields.contains(result[0])) {\r                                 try {\r                                     if (result[1] == null || result[1] == \"null\" || result[1].trim() == \"\") {\r                                         ctx[result[0]] = 0.0;\r                                     } else {\r                                         ctx[result[0]] = Double.parseDouble(result[1].toString());\r                                     }\r                                 } catch (Exception e) {\r                                     ctx[result[0]] = 0.0;\r                                 }\r                             } else {\r                                 ctx[result[0].trim()] = result[1];\r                             }\r \t\t\t\t\t      }\r \t\t\t\t\t    }\r                   }\r \t\t\t\t  \r                   if (!current_chunk.isEmpty()) {\r                       chunks.add(current_chunk);  // Add the chunk to the list\r                   }\r                   current_chunk = \"\";  // Reset for next chunk\r               } else {\r                   current_chunk += chr;  // Accumulate character to current chunk\r               }\r \r               // Handle opening braces\r               if (chr == \"[\" || chr == \"(\" || chr == \"{\" || chr == \"<\") {\r                   if (current_chunk.contains(\"=\") && !current_chunk.contains(\",\")) {\r                       brace_stack.add([ \"brace\": chr, \"position\": pos, \"valid\": true, \"current_chunk\": current_chunk ]);\r                       current_chunk = \"\";  // Reset current chunk\r                   } else {\r                       brace_stack.add([ \"brace\": chr, \"position\": pos, \"valid\": false ]);\r                       current_chunk = \"\";  // Reset current chunk\r                   }\r               }\r \r               // Handle closing braces\r               else if (chr == \"]\" || chr == \")\" || chr == \"}\" || chr == \">\") {\r                   if (!brace_stack.isEmpty()) {\r                       Map start = brace_stack.remove(brace_stack.size() - 1);  // Pop the last element from the stack\r                       Map nested_object = new HashMap();\r                       nested_object.put(\"start_brace\", start.get(\"brace\"));\r                       nested_object.put(\"start_position\", start.get(\"position\"));\r                       nested_object.put(\"valid_key\", start.get(\"valid\"));\r                       nested_object.put(\"current_chunk\", start.get(\"current_chunk\"));\r                       nested_object.put(\"end_brace\", chr);\r                       nested_object.put(\"end_position\", pos);\r                       \r                       nested_objects.add(nested_object);  // Add the nested object to the list\r                   }\r               }\r           }\r \t\t  \r \t\t  if (!current_chunk.isEmpty() && !current_chunk.startsWith(\"http\"))\r \t\t  {\r               if (current_chunk.contains(\"}}\")) \r \t\t\t  {\r                   String[] result1 = current_chunk.splitOnToken(\"}}\", 2);\r                 \r \t\t\t\t  for (int i = 0; i < result1.length; i++)\r \t\t\t\t  {\r \t\t\t\t\t if (result1.length > 0 && result1[i].contains(\"=\") && !result1[i].contains(\":\")) \r \t\t\t\t         {\r                              String chunk = /[^a-zA-Z0-9\\\\s]+$/.matcher(result1[i]).replaceAll(\"\");\r                              String[] result = chunk.splitOnToken(\"=\", 2);\r                              if (result.length == 2 && result[0] != null && !result[0].isEmpty() && !result[0].trim().isEmpty() && allowed.contains(result[0]))\r \t\t\t\t\t         {\r \t\t\t\t\t\t\t    if (result[0].contains(\".\")) {\r \t\t\t\t\t\t\t        result[0] = result[0].replace(\".\", \"_\");\r \t\t\t\t\t\t\t     }\r \t\t\t\t\t\t\t \r                                  result[1] = result[1] == \"\" ? \"null\" : result[1];\r                                  String key = result[0].trim().replace(\"}\", \"\");\r \t\t\t\t\t\t\t\t \r                                 if (doubleFields.contains(key)) {\r                                     try {\r                                         if (result[1] == null || result[1] == \"null\" || result[1].trim() == \"\") {\r                                             ctx[key] = 0.0;\r                                         } else {\r                                             ctx[key] = Double.parseDouble(result[1].toString());\r                                         }\r                                     } catch (Exception e) {\r                                         ctx[key] = 0.0;\r                                     }\r                                 } else {\r                                     ctx[key.trim()] = result[1];\r                                 }\r                              }\r                          }  \r \t\t\t\t  }\t  \r \t\t      } \r \t\t\t  else if (current_chunk.contains(\"=\")) \r \t\t\t  {\r                   // If no \"}}\", process current_chunk normally\r                   if (current_chunk.length() > 1) \r \t\t\t\t  {\r                       current_chunk = /[^a-zA-Z0-9\\\\s]+$/.matcher(current_chunk).replaceAll(\"\");\r                       String[] result = current_chunk.splitOnToken(\"=\", 2);\r                       if (result.length == 2 && result[0] != null && !result[0].isEmpty() && !result[0].trim().isEmpty() && allowed.contains(result[0])) \r \t\t\t\t\t  {\r \t\t\t\t\t  \t\tif (result[0].contains(\".\")) {\r \t\t\t\t\t\t\t     result[0] = result[0].replace(\".\", \"_\");\r \t\t\t\t\t\t\t  }\r                           result[1] = result[1] == \"\" ? \"null\" : result[1];\r \r                           if (doubleFields.contains(result[0])) {\r                               try {\r                                   if (result[1] == null || result[1] == \"null\" || result[1].trim() == \"\") {\r                                       ctx[result[0]] = 0.0;\r                                   } else {\r                                       ctx[result[0]] = Double.parseDouble(result[1].toString());\r                                   }\r                               } catch (Exception e) {\r                                   ctx[result[0]] = 0.0;\r                               }\r                           } else {\r                               ctx[result[0].trim()] = result[1];\r                           }\r                       }\r                   }\r               }\r           }\r           \r \r           // Process the nested objects to extract them as key-value pair\r           List segments = new ArrayList();\r           for (Map obj : nested_objects) {\r               if ((boolean) obj.get(\"valid_key\")) {\r                   String segment = log_message.substring((int) obj.get(\"start_position\"), (int) obj.get(\"end_position\") + 1);\r                   segment = obj.get(\"current_chunk\") + segment.substring(1);  // Replace the first character with the current chunk\r                   String[] objResult = segment.splitOnToken(\"=\", 2);\r                   if (objResult.length == 2 && objResult[0] != null && !objResult[0].isEmpty() && !objResult[0].trim().isEmpty() && allowed.contains(objResult[0])) {\r \t\t\t\t        if ((objResult[0].startsWith(\"http\") || objResult[0].startsWith(\"https\")) && objResult[0].contains(\"?\")) {\r \t\t\t\t\t\t\tString[] nestedhttpValue = objResult[0].splitOnToken(\"?\",2);\r \t\t\t\t\t\t\tobjResult[0]=nestedhttpValue[1];\r                          } else {\r \t\t\t\t\t\t \tif (!ctx.containsKey(objResult[0])) {\r \t\t\t\t\t\t\t\r \t\t\t\t\t\t\tif (objResult[0].contains(\".\")) {\r \t\t\t\t\t\t\t\t  objResult[0] = objResult[0].replace(\".\", \"_\");\r                               }\r                             if (doubleFields.contains(objResult[0])) {\r                                 try {\r                                     if (objResult[1] == null || objResult[1] == \"null\" || objResult[1].trim() == \"\") {\r                                         ctx[objResult[0]] = 0.0;\r                                     } else {\r                                         ctx[objResult[0]] = Double.parseDouble(objResult[1].toString());\r                                     }\r                                 } catch (Exception e) {\r                                     ctx[objResult[0]] = 0.0;\r                                 }\r                             } else {\r                                 ctx[objResult[0].trim()] = objResult[1];\r                             }\r \t\t\t\t\t\t\t\t\r \t\t\t\t\t\t\t\t\r \t\t\t\t\t\t\t\t// Set the key-value pair in the event\r \t\t\t\t\t\t\t}\r \t\t\t\t\t\t }\r                   }\r                   Map segment_info = new HashMap();\r                   segment_info.put(\"nested_object\", \"Start nested object Brace: \" + obj.get(\"start_brace\") + \", Position: \" + obj.get(\"start_position\") + \" & End nested object Brace: \" + obj.get(\"end_brace\") + \", Position: \" + obj.get(\"end_position\"));\r                   segment_info.put(\"log_segment\", segment);\r                   segments.add(segment_info);  // Store the segment information\r               }\r           }\r \r           // If necessary, set the results back in the event\r           // ctx.event.put(\"nested_objects\", segments);  // Uncomment this if you need to return segments\r           // ctx.event.put(\"current_chunk\", chunks);    // Uncomment this if you need to return chunks\r         ","lang":"painless","on_failure":[{"set":{"field":"scriptError","value":true}}]}},{"rename":{"ignore_missing":true,"field":"service","target_field":"app_service"}},{"rename":{"ignore_missing":true,"field":"source","target_field":"app_source"}},{"rename":{"ignore_missing":true,"field":"user","target_field":"app_user"}},{"gsub":{"pattern":"ms","ignore_missing":true,"field":"execTime","replacement":""}},{"convert":{"ignore_missing":true,"field":"execTime","type":"long"}},{"rename":{"ignore_missing":true,"field":"error","target_field":"app_error"}},{"rename":{"ignore_missing":true,"field":"file","target_field":"app_file"}},{"rename":{"ignore_missing":true,"field":"url","target_field":"app_url"}},{"rename":{"ignore_missing":true,"field":"metadata","target_field":"app_metadata"}},{"remove":{"field":"rest","on_failure":[{"set":{"field":"rest_remove_failure","value":true}}]}},{"rename":{"field":"transaction-id","target_field":"transaction_id","ignore_missing":true}}]}

And just mentioning the pipeline under challenge default.json

{
    "name": "my-challenge",
    "description": "My new challenge",
    "default": true,
    "schedule": [
    {
      "operation": "delete-index"
    },
    {
      "operation": {
        "operation-type": "create-index",
        "settings": {{index_settings | default({}) | tojson}}
      }
    },
    {
      "operation": {
        "operation-type": "bulk",
        "pipeline": "test-sps-pipeline",
        "bulk-size": {{bulk_size | default(5000)}},
        "ingest-percentage": {{ingest_percentage | default(100)}}
      },
      "clients": {{bulk_indexing_clients | default(8)}},
      "iterations": 106397,
      "target-throughput": 1230
    }
  ]
}

And this code was supposed to iterate the ingestion for 60k records, means keep on ingesting 60k records until all iterations are completed thats what my assumptions is, but its not, its indexing only once (appr 58k records).
so if I create track with 1.6b records index & add parallel task to run for 24hrs will it index docs to cluster until 24 hrs & benchmark the cluster.
track.json file

{
  "version": 2,
  "description": "Tracker-generated track for test-sps-track",
  "indices": [
    {
      "name": "rally_sps_logs"
    }
  ],
  "corpora": [
    {
      "name": ".ds-demo_sps_application_logs-8.17.1-2025.07.19-000002",
      "documents": [
        {
          "target-index": "rally_sps_logs",
          "source-file": "sps_logs.json.bz2",
          "document-count": 60001,
          "pipeline": "test-sps-pipeline"
        }
      ]
    }
  ],

  "operations": [{{ rally.collect(parts="operations/*.json") }}],
  "challenges": [{{ rally.collect(parts="challenges/*.json") }}]
}

And i tried to use the pipeline in cluster itself instead of creating from esrally but its throwing script error.

With Regards,
Abhishek M

What error do you get from the pipeline?

With regard to why it isnt looping - by default each document will only be ingested once - you can use looped: true on the bulk operation to instead mean that we ingest the same documents over and over again

Presuming your source data doesnt contain _id, then this should allow you to loop. I suggest looping say 10 times and then validate that the index contains 60001 * 10 documents