.NET-specific: PUT requests to LogStash fail when sent using HttpClient, succeed when sent using cURL

Here is my logstash.conf file:

input {
	http {
		host => "127.0.0.1"
		port => 31311 
		ssl => false
	}
}

filter {
	mutate {
		split => { "[headers][request_path]" => "/"}
		add_field => { "index_id" => "%{[headers][request_path][1]}" }
	}
	
	ruby { 
		code => "event.set('request_path_length', event.get('[headers][request_path]').length)" 
	}
	
	if [request_path_length] == 3 {
		mutate {
			add_field => { "document_id" => "%{[headers][request_path][2]}" }
		}
	}
}
	
output {
	stdout {
		codec => "rubydebug"
	}
  
	if [request_path_length] == 3 {
		elasticsearch {
			hosts => "http://localhost:9200"
			index => "%{index_id}"
			document_id => "%{document_id}"
		}
	}
	else {
		elasticsearch {
			hosts => "http://localhost:9200"
			index => "%{index_id}"
		}
	}
}

My remote VM, which also hosts my ElasticSearch and LogStash servers, listens on Port 8080.

On my local machine, I periodically send zipped folders (containing JSON documents) over TCP to my remote server, which receives the data into a memory stream, unzips the folders, and sends the contents to LogStash. LogStash in turn forwards the data to ElasticSearch.

I am currently testing the workflow with some dummy data.

On my remote server, here is the method for receiving data over TCP:

private static void ReceiveAndUnzipElasticSearchDocumentFolder(int numBytesExpectedToReceive)
{
	int numBytesLeftToReceive = numBytesExpectedToReceive;

	using (MemoryStream zippedFolderStream = new MemoryStream(new byte[numBytesExpectedToReceive]))
	{
		while (numBytesLeftToReceive > 0)
		{
			// Receive data in small packets
		}

		zippedFolderStream.Unzip(afterReadingEachDocument: LogStashDataSender.Send);
	}
}

Here is the code for unzipping the received folder:

public static class StreamExtensions
{
    public static void Unzip(this Stream zippedElasticSearchDocumentFolderStream, Action<ElasticSearchJsonDocument> afterReadingEachDocument)
    {
        JsonSerializer jsonSerializer = new JsonSerializer();

        foreach (ZipArchiveEntry entry in new ZipArchive(zippedElasticSearchDocumentFolderStream).Entries)
        {
            using (JsonTextReader jsonReader = new JsonTextReader(new StreamReader(entry.Open())))
            {
                dynamic jsonObject = jsonSerializer.Deserialize<ExpandoObject>(jsonReader);

                string jsonIndexId = jsonObject.IndexId;
                string jsonDocumentId = jsonObject.DocumentId;

                afterReadingEachDocument(new ElasticSearchJsonDocument(jsonObject, jsonIndexId, jsonDocumentId));
            }
        }
    }
}

And here is the method for sending data to LogStash:

public static async void Send(ElasticSearchJsonDocument document)
{
	HttpResponseMessage response = 
		await httpClient.PutAsJsonAsync(
			IsNullOrWhiteSpace(document.DocumentId) 
				? $"{document.IndexId}" 
				: $"{document.IndexId}/{document.DocumentId}",
			document.JsonObject);

	try
	{
		response.EnsureSuccessStatusCode();
	}
	catch (Exception exception)
	{
		Console.WriteLine(exception.Message);
	}

	Console.WriteLine($"{response.Content}");
}

The httpClient referenced in the public static async void Send(ElasticSearchJsonDocument document) method was created using the following code:

private const string LogStashHostAddress = "http://127.0.0.1";
private const int LogStashPort = 31311;

httpClient = new HttpClient { BaseAddress = new Uri($"{LogStashHostAddress}:{LogStashPort}/") };
httpClient.DefaultRequestHeaders.Accept.Clear();
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

When I step into a new debug instance, the program runs smoothly, but dies immediately after executing await httpClient.PutAsJsonAsync for each of the documents contained inside the zipped folder -- response.EnsureSuccessStatusCode(); is never hit; neither is Console.WriteLine(exception.Message); nor Console.WriteLine($"{response.Content}");.

Here is an example of ElasticSearchJsonDocument that is passed to the public static async void Send(ElasticSearchJsonDocument document) method:

enter image description here

When I ran the same PUT request using cURL, the Book index was successfully created, and I could then a GET request to retrieve the data from ElasticSearch.

My questions are:

  1. Why did the program die immediately (with no visible exception messages) after executing await httpClient.PutAsJsonAsync(...) for each of the JSON document inside the received zipped folder?
  2. What changes should I make to ensure that I can make successful PUT requests to LogStash using a HttpClient instance?

I changed my httpClient instantiation code from

httpClient = new HttpClient { BaseAddress = new Uri($"{LogStashHostAddress}:{LogStashPort}/") };
httpClient.DefaultRequestHeaders.Accept.Clear();
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

to

httpClient = new HttpClient();
httpClient.DefaultRequestHeaders.Accept.Clear();
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

And I changed await http.Client.PutAsJsonAsync(...) to

HttpResponseMessage response =                
	await httpClient.PutAsJsonAsync(
		IsNullOrWhiteSpace(document.DocumentId)
			? $"{LogStashHostAddress}:{LogStashPort}/{document.IndexId}"
			: $"{LogStashHostAddress}:{LogStashPort}/{document.IndexId}/{document.DocumentId}",
		document.JsonObject);

response.EnsureSuccessStatusCode();

It turns out that the BaseAddress field in HttpClient is extremely user-unfriendly, so instead of wasting more time on it, I decided to just eliminate it entirely.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.