Here is my logstash.conf
file:
input {
http {
host => "127.0.0.1"
port => 31311
ssl => false
}
}
filter {
mutate {
split => { "[headers][request_path]" => "/"}
add_field => { "index_id" => "%{[headers][request_path][1]}" }
}
ruby {
code => "event.set('request_path_length', event.get('[headers][request_path]').length)"
}
if [request_path_length] == 3 {
mutate {
add_field => { "document_id" => "%{[headers][request_path][2]}" }
}
}
}
output {
stdout {
codec => "rubydebug"
}
if [request_path_length] == 3 {
elasticsearch {
hosts => "http://localhost:9200"
index => "%{index_id}"
document_id => "%{document_id}"
}
}
else {
elasticsearch {
hosts => "http://localhost:9200"
index => "%{index_id}"
}
}
}
My remote VM, which also hosts my ElasticSearch and LogStash servers, listens on Port 8080.
On my local machine, I periodically send zipped folders (containing JSON documents) over TCP to my remote server, which receives the data into a memory stream, unzips the folders, and sends the contents to LogStash. LogStash in turn forwards the data to ElasticSearch.
I am currently testing the workflow with some dummy data.
On my remote server, here is the method for receiving data over TCP:
private static void ReceiveAndUnzipElasticSearchDocumentFolder(int numBytesExpectedToReceive)
{
int numBytesLeftToReceive = numBytesExpectedToReceive;
using (MemoryStream zippedFolderStream = new MemoryStream(new byte[numBytesExpectedToReceive]))
{
while (numBytesLeftToReceive > 0)
{
// Receive data in small packets
}
zippedFolderStream.Unzip(afterReadingEachDocument: LogStashDataSender.Send);
}
}
Here is the code for unzipping the received folder:
public static class StreamExtensions
{
public static void Unzip(this Stream zippedElasticSearchDocumentFolderStream, Action<ElasticSearchJsonDocument> afterReadingEachDocument)
{
JsonSerializer jsonSerializer = new JsonSerializer();
foreach (ZipArchiveEntry entry in new ZipArchive(zippedElasticSearchDocumentFolderStream).Entries)
{
using (JsonTextReader jsonReader = new JsonTextReader(new StreamReader(entry.Open())))
{
dynamic jsonObject = jsonSerializer.Deserialize<ExpandoObject>(jsonReader);
string jsonIndexId = jsonObject.IndexId;
string jsonDocumentId = jsonObject.DocumentId;
afterReadingEachDocument(new ElasticSearchJsonDocument(jsonObject, jsonIndexId, jsonDocumentId));
}
}
}
}
And here is the method for sending data to LogStash:
public static async void Send(ElasticSearchJsonDocument document)
{
HttpResponseMessage response =
await httpClient.PutAsJsonAsync(
IsNullOrWhiteSpace(document.DocumentId)
? $"{document.IndexId}"
: $"{document.IndexId}/{document.DocumentId}",
document.JsonObject);
try
{
response.EnsureSuccessStatusCode();
}
catch (Exception exception)
{
Console.WriteLine(exception.Message);
}
Console.WriteLine($"{response.Content}");
}
The httpClient
referenced in the public static async void Send(ElasticSearchJsonDocument document)
method was created using the following code:
private const string LogStashHostAddress = "http://127.0.0.1";
private const int LogStashPort = 31311;
httpClient = new HttpClient { BaseAddress = new Uri($"{LogStashHostAddress}:{LogStashPort}/") };
httpClient.DefaultRequestHeaders.Accept.Clear();
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
When I step into a new debug instance, the program runs smoothly, but dies immediately after executing await httpClient.PutAsJsonAsync
for each of the documents contained inside the zipped folder -- response.EnsureSuccessStatusCode();
is never hit; neither is Console.WriteLine(exception.Message);
nor Console.WriteLine($"{response.Content}");
.
Here is an example of ElasticSearchJsonDocument
that is passed to the public static async void Send(ElasticSearchJsonDocument document)
method:
When I ran the same PUT
request using cURL, the Book
index was successfully created, and I could then a GET
request to retrieve the data from ElasticSearch.
My questions are:
- Why did the program die immediately (with no visible exception messages) after executing
await httpClient.PutAsJsonAsync(...)
for each of the JSON document inside the received zipped folder? - What changes should I make to ensure that I can make successful
PUT
requests to LogStash using aHttpClient
instance?