Hi,
I have a middle tier that keep fetching data from data source and do some processing work then put them into elasticsearch. I found that when the middle tier machine (it is a Azure Service Fabric VM) is hot (CPU goes up to 95%), the bulk index response from elasticsearch sometimes take longer time, up to 30 seconds (normally 1~4 seconds) or more, and sometimes it get TaskCanceledException. I'm using .net framework and HttpClient to send request, and I set the time out value to be 45 seconds.
Does anyone know what may be the cause?
Is it a networking issue caused by high CPU usage in the client machine?
Or is it because the elasticsearch cluster is too busy? Some elasticsearch data node CPU usage goes up to >90% when there is exception, but the exception does not always happen when the data node has high CPU usage.
Thanks for your help in advanced. Here is my code sample:
public void MyTask()
{
using (HttpClient httpClient = new HttpClient(new WebRequestHandler()))
{
httpClient.DefaultRequestHeaders.TransferEncodingChunked = false;
httpClient.DefaultRequestHeaders.Add("Connection", "keep-alive");
httpClient.Timeout = TimeSpan.FromSeconds(30);
while (true)
{
// fetch items from the mem queue. these items are already pre-processed
var logLines = FetchLogLines();
List<Task<long>> sendTasks = new List<Task<long>>();
sendTasks.Add(Task.Run(() => SendBulkInsertRequest(httpClient, logLines, IndexName)));
Task.WaitAll(sendTasks.ToArray());
}
}
}
public long SendBulkInsertRequest(HttpClient httpClient, List loglines, string indexName)
{
// generate the post body
StringBuilder sb = new StringBuilder();
HttpResponseMessage result = null;
string header = "{\"index\":{}}";
foreach (var log in loglines)
{
sb.Append(header).Append("\n");
sb.Append(log).Append("\n");
}
var postBody = sb.ToString();
var content = new StringContent(postBody, Encoding.UTF8, "application/json");
// generate request url
string indexUrl = GetIndexUrl();
int sentCount = 0;
// send index request
try
{
result = httpClient.PostAsync(indexUrl, content).Result;
sentCount = result.IsSuccessStatusCode ? loglines.Count : 0;
}
catch (Exception ex)
{
this.Logger($"IndexTask-{taskId}: exception while bulk insert to url: {indexUrl}", ex);
sentCount = 0;
}
return sentCount;
}