Forwarding the http input data to http output data

Hello there.

Passing from Elastic On-Prem to Elastic Cloud, I am trying to use Logstash to create a "store and forward" implementation for creating documents in elastic. So, practically, before I was directly writing to Elastic, now I want to use Logstash to collect my calls and execute them taking advantage of the persistent_queue function and don't lose data in case of internet outage, letting Logstash waiting for the connection to be established again, instead of failing.

Before, I was successfully sending POST requests to "http:\\myelasticinstance:9200\_bulk\" + my "body" (that contains index and data) and writing it to Elastic.

Now, I want Logstash to receive my http call using the input http plugin and execute the same "body" to "http:\\myelasticinstance:9200\_bulk\".

How can I use "headers.request_path", "headers.request_method", "headers.content_type" and "message" as parameters of my output http plugin?

I guess it is something like this but it gives me error (I can see that the call is received correctly because I am also sending output to a file):

input {
    http {
    port => 5043
  }
}
output {
    file {
        path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }
    http {
       http_method=>"headers.request_method"
       url=>"https://myelasticcloudinstance/" + "headers.request_path"
       content_type="headers.content_type"
       message=>"message"
   }
}

Thanks.

Making some progress. Now, with this config:

input {
	http {
		port => 5043
	}
}
filter {
	mutate {
		add_field => { 
			"full_url" => "https://esposito.free.beeceptor.com%{[headers][request_path]}" 
		} 
		add_field => { 
			"original_content_type" => "%{[headers][content_type]}"
		}
		add_field => { 
			"original_path" => "%{[headers][request_path]}"
		}
	}
}
output {
    file {
        path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }

I have this result, saved to file:

{
    "@version": "1",
    "message": "{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"16\",\"STOREATTRIBUTE3\":\"Passed Value\",\"DATETIME\":\"2025-03-07T19:14:58.400\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLSECURITY\":{\"SID\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Gandalf\",\"LASTNAME\":\"the Grey\"}}\r\n{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"17\",\"DRAWERIDENT\":\"test\",\"DATETIME\":\"2025-03-07T19:14:58.400\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE3\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLTRANSACTIONATTRIBUTES\":{\"INVOICENUMBER\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Gandalf\",\"LASTNAME\":\"the Grey\"}}\r\n",
    "full_url": "https://esposito.free.beeceptor.com/_bulk",
    "original_path": "/_bulk",
    "host": "127.0.0.1",
    "headers": {
        "http_user_agent": "Mozilla/4.0 (compatible; Clever Internet Suite)",
        "http_host": "localhost:5043",
        "content_type": "application/x-ndjson; charset=utf-8",
        "request_path": "/_bulk",
        "http_accept": "*/*",
        "connection": "Keep-Alive",
        "request_method": "POST",
        "cache_control": "no-cache",
        "http_version": "HTTP/1.1",
        "content_length": "781",
        "accept_encoding": "gzip"
    },
    "@timestamp": "2025-03-07T19:14:58.522Z",
    "original_content_type": "application/x-ndjson; charset=utf-8"
}

Unfortunately, my "message" contains escape chars and looks like I cannot use "codec => json" because my input json body contains 4 different documents:

{ "index" : { "_index" : "journaling_insert"}}

{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}

{ "index" : { "_index" : "journaling_insert"}}

{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}

How can I remove all those Escapes?

Thanks in advance.

Hello and welcome,

Why you want to use the http output plugin instead of the elasticsearch output plugin?

If your destination is an Elasticsearch instance, use the correct plugin, it will be easier.

1 Like

Hello and thank you for the answer.

I tried it at the beginning but the fact that I am using the /_bulk url to send my request for a group of documents looks like not working.
This is the reason for not using the standard elasticsearch plugin.

Do you have a suggestion, looking at the body that I send? Is it possible to use the bulk entry as I do with the elasticsearch plugin?

What was not working? The elasticsearch output already uses the bulk API, do you have any error log?

No idea what the body would look like, never saw anyone using the http output to send data to Elasticsearch since the Elasticsearch output already does that and it uses the bulk API as you want.

Can you try to change to the elasticsearch output and share the errors you get?

Okay, I changed my conf file as below:

input {
	http {
		port => 5043
	}
}
filter {
	mutate {
		add_field => { 
			"full_url" => "https://esposito.free.beeceptor.com%{[headers][request_path]}" 
		} 
		add_field => { 
			"original_content_type" => "%{[headers][content_type]}"
		}
		add_field => { 
			"original_path" => "%{[headers][request_path]}"
		}
	}
}
output {
    file {
        path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }
	elasticsearch {
		hosts => "localhost:9200"
	}
# 	http {
#		http_method=>"post"
#		url => "%{[full_url]}"
#		message => "%{[parsed_message]}"
#	}
}

I actually don't see any error in my console:

D:\logstash-7.17.28\bin>logstash.bat -f D:\logstash-7.17.28\config\http-logstash.conf
"Using bundled JDK: D:\logstash-7.17.28\jdk\bin\java.exe"
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Sending Logstash logs to D:/logstash-7.17.28/logs which is now configured via log4j2.properties
[2025-03-10T09:55:51,080][INFO ][logstash.runner          ] Log4j configuration path used is: D:\logstash-7.17.28\config\log4j2.properties
[2025-03-10T09:55:51,100][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.17.28", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.26+4 on 11.0.26+4 +indy +jit [mswin32-x86_64]"}
[2025-03-10T09:55:51,103][INFO ][logstash.runner          ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djdk.io.File.enableADS=true, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true]
[2025-03-10T09:55:51,256][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2025-03-10T09:55:54,700][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2025-03-10T09:55:56,497][INFO ][org.reflections.Reflections] Reflections took 135 ms to scan 1 urls, producing 119 keys and 419 values
[2025-03-10T09:55:58,566][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2025-03-10T09:55:59,058][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2025-03-10T09:55:59,383][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2025-03-10T09:55:59,400][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch version determined (7.9.1) {:es_version=>7}
[2025-03-10T09:55:59,405][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2025-03-10T09:55:59,499][INFO ][logstash.outputs.elasticsearch][main] Config is not compliant with data streams. `data_stream => auto` resolved to `false`
[2025-03-10T09:55:59,503][INFO ][logstash.outputs.elasticsearch][main] Config is not compliant with data streams. `data_stream => auto` resolved to `false`
[2025-03-10T09:55:59,563][INFO ][logstash.outputs.elasticsearch][main] Using a default mapping template {:es_version=>7, :ecs_compatibility=>:disabled}
[2025-03-10T09:55:59,614][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["D:/logstash-7.17.28/config/http-logstash.conf"], :thread=>"#<Thread:0x22aaec0d run>"}
[2025-03-10T09:56:00,550][INFO ][logstash.outputs.elasticsearch][main] Created rollover alias {:name=>"<logstash-{now/d}-000001>"}
[2025-03-10T09:56:00,608][INFO ][logstash.outputs.elasticsearch][main] Installing ILM policy {"policy"=>{"phases"=>{"hot"=>{"actions"=>{"rollover"=>{"max_size"=>"50gb", "max_age"=>"30d"}}}}}} {:name=>"logstash-policy"}
[2025-03-10T09:56:01,465][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>1.84}
[2025-03-10T09:56:01,772][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2025-03-10T09:56:01,790][INFO ][logstash.inputs.http     ][main][86ababf52fca957ddd549b8c0d4312337bf7118ad8cd7723b2c6c49429fa658a] Starting http input listener {:address=>"0.0.0.0:5043", :ssl=>"false"}
[2025-03-10T09:56:01,864][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2025-03-10T09:56:29,994][INFO ][logstash.outputs.file    ][main][22a64179258abdabb57f682df728d676d90fa80e2b094809f3ac162c2228c601] Opening file {:path=>"D:/log_streaming/my_app/log-2025-03-10_13.56.29.781.log"}
[2025-03-10T09:56:46,843][INFO ][logstash.outputs.file    ][main][22a64179258abdabb57f682df728d676d90fa80e2b094809f3ac162c2228c601] Closing file D:/log_streaming/my_app/log-2025-03-10_13.56.29.781.log
[2025-03-10T10:00:29,431][INFO ][logstash.outputs.file    ][main][22a64179258abdabb57f682df728d676d90fa80e2b094809f3ac162c2228c601] Opening file {:path=>"D:/log_streaming/my_app/log-2025-03-10_14.00.29.312.log"}
[2025-03-10T10:00:46,834][INFO ][logstash.outputs.file    ][main][22a64179258abdabb57f682df728d676d90fa80e2b094809f3ac162c2228c601] Closing file D:/log_streaming/my_app/log-2025-03-10_14.00.29.312.log
[2025-03-10T10:02:39,308][INFO ][logstash.outputs.file    ][main][22a64179258abdabb57f682df728d676d90fa80e2b094809f3ac162c2228c601] Opening file {:path=>"D:/log_streaming/my_app/log-2025-03-10_14.02.39.193.log"}
[2025-03-10T10:02:56,827][INFO ][logstash.outputs.file    ][main][22a64179258abdabb57f682df728d676d90fa80e2b094809f3ac162c2228c601] Closing file D:/log_streaming/my_app/log-2025-03-10_14.02.39.193.log

I can see the output file in /log_streaming/my_app/ with my request written, but I don't see any record written in my index. I don't even see any log in the logstash\logs\ folder.
Am I missing some parameter in the elasticsearch plugin configuration?

Can you elaborate how you were doing that? I mean are you using an off-the-shelf 3rd party application, using some code you yourself created, if so some details on that code (e.g. java application, python, whatever, ...)?

Reason I ask is the problem report is a little strange. You were using elasticsaearch OK I assume, as you wrote:

So your docs were being indexed correctly. You want to put logstash in the middle, which is fine, but I dont see how logstash (correctly configured, with elasticsearch output) cannot index exactly the same documents as were being indexed when sent directly. so I'm guessing there's an important missing detail we dont know? Maybe wrong guess, therefore asking.

Correct, Kevin. I want to use Logstash in the middle to automatically implement a Store and Forward technology, moving from on-prem to cloud and I cannot lose any document sent.

Let's start saying that I am new to elastic so, my approach to this problem could have been peculiar but as simple as I can think: I have a http request and I am sure it's going to succeed if I forward the same request to the same endpoint.

I have a Delphi application that is sending http requests to elastic url "myhost:9200/_bulk" with the body:

{ "index" : { "_index" : "journaling_insert"}}

{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}

{ "index" : { "_index" : "journaling_insert"}}

{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}

So, json index doc + document to write

My idea was to forward the same request.

Adding the elasticsearch plugin, I don't have an error but I don't see any record written.
What do you think?

No, the printed representation of your message contains escape chars. The [message] field itself does not.

I can see two ways to do this. If you do something like

    mutate { gsub => [ "message", "\A", "[", "message", "\n", ",", "message", "\Z", "]" ] }
    json { source => "message" target => "[@metadata][bulk]" remove_field => [ "message" ] }

you will end up with

 "@metadata" => {
    "bulk" => [
        [0] {
            "index" => {
                "_index" => "journaling_insert"
            }
        },
        [1] {
            "STOREATTRIBUTE3" => "Passed Value",
                   "REMOTEIP" => "1.111.1.11",
                    "EVENTID" => "16",
            "STOREATTRIBUTE5" => "StoreDB Value",
                 "FLSECURITY" => {
                "SID" => "1111"
            },
            "STOREATTRIBUTE4" => "StoreDB Value",
                 "FLCUSTOMER" => {
                 "LASTNAME" => "the Grey",
                "FIRSTNAME" => "Gandalf"
            },
            "STOREATTRIBUTE2" => "StoreDB Value",
                   "DATETIME" => "2025-03-07T19:14:58.400",
                  "CHAINCODE" => "8971"
        },
        [2] {
            "index" => {
                "_index" => "journaling_insert"
            }
        },
        ...

You would need to split that pairwise to get each document and the name of the index to write it to. That would require ruby.

You can then move all the fields of the document up to the root, and use a sprintf reference to pick the index in the output

index => "%{[@metadata][index][_index]}"

That requires a bunch of knowledge about the format of _bulk requests, and will break for anything other than a simple index command (so no support for update, delete, create etc.)

Alternatively, you already have exactly the request body that the _bulk endpoint expects, so I would suggest returning to your original approach with an http output. How exactly to reference %{[message]} in the output I cannot say, since I do not have an HTTP endpoint handy to test.

I think I understand, but maybe (almost certainly) @leandrojmp can explain better.

Your application isn't sending just the documents, it's also sending the { "index" : { "_index" : "journaling_insert"}} lines too? If you instead send that to logstash, well I'm honestly not sure, but more commonly logstash is receiving a stream of documents, well a stream which is tries to turn into documents, sometimes that being a no-op, and you define the elasticsearch index to use in the elasticsearch output of logstash. It might be as simple as skipping those (in a sense unnecessary) lines.

Well, without seeing the config and the logs, I just dont know.

Sorry, also one other point before I forget, originally you wrote

Well, for this to work the way you want, there are other steps/configurations involved. And PQs aim for at-least-once delivery, that doesn't (always) mean exactly-once-delivery! Bear that in mind here.

1 Like

I am following the suggestion of sending the "message" field as it comes from the request:

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
	http {
		port => 5043
	}
}
filter {
	mutate {
		add_field => { 
			"full_url" => "http://localhost:9200%{[headers][request_path]}" 
		} 
		add_field => { 
			"original_content_type" => "%{[headers][content_type]}"
		}
		add_field => { 
			"original_path" => "%{[headers][request_path]}"
		}
	}
}
output {
    file {
        path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }
#	elasticsearch {
#		hosts => "localhost:9200"
#	}
 	http {
		http_method=>"post"
		url => "%{[full_url]}"
		content_type => "%{[original_content_type]}"
		message => "%{[message]}"
	}
}

and I receive a 400 bad request:

D:\logstash-7.17.28\bin>logstash.bat -f D:\logstash-7.17.28\config\http-logstash.conf
"Using bundled JDK: D:\logstash-7.17.28\jdk\bin\java.exe"
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Sending Logstash logs to D:/logstash-7.17.28/logs which is now configured via log4j2.properties
[2025-03-10T11:11:23,893][INFO ][logstash.runner          ] Log4j configuration path used is: D:\logstash-7.17.28\config\log4j2.properties
[2025-03-10T11:11:23,906][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.17.28", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.26+4 on 11.0.26+4 +indy +jit [mswin32-x86_64]"}
[2025-03-10T11:11:23,908][INFO ][logstash.runner          ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djdk.io.File.enableADS=true, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true]
[2025-03-10T11:11:23,985][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2025-03-10T11:11:27,553][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2025-03-10T11:11:28,467][INFO ][org.reflections.Reflections] Reflections took 62 ms to scan 1 urls, producing 119 keys and 419 values
[2025-03-10T11:11:30,075][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["D:/logstash-7.17.28/config/http-logstash.conf"], :thread=>"#<Thread:0x353d51e6 run>"}
[2025-03-10T11:11:30,711][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.63}
[2025-03-10T11:11:30,901][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2025-03-10T11:11:30,911][INFO ][logstash.inputs.http     ][main][09769fcd51041c7ee09fd2b2c95157c792b432524dd9dfc651525ec1b431ca60] Starting http input listener {:address=>"0.0.0.0:5043", :ssl=>"false"}
[2025-03-10T11:11:30,963][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2025-03-10T11:11:54,634][INFO ][logstash.outputs.file    ][main][9701914d7bae3b4de7fadf7f1686858d2ec2878b7199c60c7d55bd43f3818235] Opening file {:path=>"D:/log_streaming/my_app/log-2025-03-10_15.11.54.451.log"}
[2025-03-10T11:11:54,952][ERROR][logstash.outputs.http    ][main][0321af037eb473721499dd38440c94e1f1b26f0ef0fd19394ea75bebf7f07546] [HTTP Output Failure] Encountered non-2xx HTTP code 400 {:response_code=>400, :url=>"http://localhost:9200/_bulk", :event=>#<LogStash::Event:0xab2abc2>}
[2025-03-10T11:12:20,942][INFO ][logstash.outputs.file    ][main][9701914d7bae3b4de7fadf7f1686858d2ec2878b7199c60c7d55bd43f3818235] Closing file D:/log_streaming/my_app/log-2025-03-10_15.11.54.451.log

Using a endpoint simulator, beeceptor, I can see this message arriving:

{
    "@version": "1",
    "@timestamp": "2025-03-10T15:19:26.610Z",
    "message": "{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"16\",\"STOREATTRIBUTE3\":\"Passed Value\",\"DATETIME\":\"2025-03-10T15:19:26.493\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLSECURITY\":{\"SID\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Francesco\",\"LASTNAME\":\"Esposito\"}}\r\n{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"17\",\"DRAWERIDENT\":\"test\",\"DATETIME\":\"2025-03-10T15:19:26.493\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE3\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLTRANSACTIONATTRIBUTES\":{\"INVOICENUMBER\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Sam\",\"LASTNAME\":\"Stein\"}}\r\n",
    "full_url": "https://esposito.free.beeceptor.com/_bulk",
    "host": "127.0.0.1",
    "original_path": "/_bulk",
    "headers": {
        "accept_encoding": "gzip",
        "content_type": "application/x-ndjson; charset=utf-8",
        "http_host": "localhost:5043",
        "content_length": "776",
        "http_user_agent": "Mozilla/4.0 (compatible; Clever Internet Suite)",
        "request_method": "POST",
        "connection": "Keep-Alive",
        "http_accept": "*/*",
        "request_path": "/_bulk",
        "http_version": "HTTP/1.1",
        "cache_control": "no-cache"
    },
    "original_content_type": "application/x-ndjson; charset=utf-8"
}

How can I extract just the field "message" from the body?
I guess it's going to work if I can just send what I need.

Trying something like:

...
		add_field => { 
			"original_message" => "%{[message]}"
		}
...
output {
    file {
        path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }
    http {
		http_method=>"post"
		url => "%{[full_url]}"
		content_type => "%{[original_content_type]}"
		message => "%{[original_message]}"
	}
}

correctly, my output file now has original_message as one of the fields:

{
    "message": "{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"16\",\"STOREATTRIBUTE3\":\"Passed Value\",\"DATETIME\":\"2025-03-10T15:46:45.215\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLSECURITY\":{\"SID\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Francesco\",\"LASTNAME\":\"Esposito\"}}\r\n{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"17\",\"DRAWERIDENT\":\"test\",\"DATETIME\":\"2025-03-10T15:46:45.215\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE3\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLTRANSACTIONATTRIBUTES\":{\"INVOICENUMBER\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Sam\",\"LASTNAME\":\"Stein\"}}\r\n",
    "original_content_type": "application/x-ndjson; charset=utf-8",
    "original_message": "{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"16\",\"STOREATTRIBUTE3\":\"Passed Value\",\"DATETIME\":\"2025-03-10T15:46:45.215\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLSECURITY\":{\"SID\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Francesco\",\"LASTNAME\":\"Esposito\"}}\r\n{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"17\",\"DRAWERIDENT\":\"test\",\"DATETIME\":\"2025-03-10T15:46:45.215\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE3\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLTRANSACTIONATTRIBUTES\":{\"INVOICENUMBER\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Sam\",\"LASTNAME\":\"Stein\"}}\r\n",
    "headers": {
        "http_accept": "*/*",
        "http_host": "localhost:5043",
        "http_user_agent": "Mozilla/4.0 (compatible; Clever Internet Suite)",
        "content_length": "776",
        "connection": "Keep-Alive",
        "request_method": "POST",
        "cache_control": "no-cache",
        "content_type": "application/x-ndjson; charset=utf-8",
        "request_path": "/_bulk",
        "accept_encoding": "gzip",
        "http_version": "HTTP/1.1"
    },
    "@version": "1",
    "host": "127.0.0.1",
    "original_path": "/_bulk",
    "@timestamp": "2025-03-10T15:46:45.297Z",
    "full_url": "https://esposito.free.beeceptor.com/_bulk"
}

But, I cannot understand why my http message sent to my endpoint simulator is:

{
    "message": "{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"16\",\"STOREATTRIBUTE3\":\"Passed Value\",\"DATETIME\":\"2025-03-10T15:46:45.215\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLSECURITY\":{\"SID\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Francesco\",\"LASTNAME\":\"Esposito\"}}\r\n{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"17\",\"DRAWERIDENT\":\"test\",\"DATETIME\":\"2025-03-10T15:46:45.215\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE3\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLTRANSACTIONATTRIBUTES\":{\"INVOICENUMBER\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Sam\",\"LASTNAME\":\"Stein\"}}\r\n",
    "original_content_type": "application/x-ndjson; charset=utf-8",
    "original_message": "{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"16\",\"STOREATTRIBUTE3\":\"Passed Value\",\"DATETIME\":\"2025-03-10T15:46:45.215\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLSECURITY\":{\"SID\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Francesco\",\"LASTNAME\":\"Esposito\"}}\r\n{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"17\",\"DRAWERIDENT\":\"test\",\"DATETIME\":\"2025-03-10T15:46:45.215\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE3\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLTRANSACTIONATTRIBUTES\":{\"INVOICENUMBER\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Sam\",\"LASTNAME\":\"Stein\"}}\r\n",
    "headers": {
        "http_accept": "*/*",
        "http_host": "localhost:5043",
        "http_user_agent": "Mozilla/4.0 (compatible; Clever Internet Suite)",
        "content_length": "776",
        "connection": "Keep-Alive",
        "request_method": "POST",
        "cache_control": "no-cache",
        "content_type": "application/x-ndjson; charset=utf-8",
        "request_path": "/_bulk",
        "accept_encoding": "gzip",
        "http_version": "HTTP/1.1"
    },
    "@version": "1",
    "host": "127.0.0.1",
    "original_path": "/_bulk",
    "@timestamp": "2025-03-10T15:46:45.297Z",
    "full_url": "https://esposito.free.beeceptor.com/_bulk"
}

That's weird! What do you guys think?

If you are able to upgrade the http output to 5.6.1 then you will get a more informative error message logged, replacing :event with response.body.

Otherwise I suggest checking if elasticsearch logs a useful error message about what you tried to send to it.

My suggestion is to use the elasticsearch output.

The http output might eventually work, but I see no functional benefit.

This problem is basically solved, the request that I am sending contains the entire "message" field and obviously is a "Bad Request".

Yes, I am working in 2 directions.

  1. Use elasticsearch output, changing the way my app writes requests, so writing one document at the time, avoiding using bulk.

  2. Try to understand why I am not able to extract the request_body from the "message".

So, I can have those two Prove of Concept.

@RainTown do you have any suggestion about why I cannot get just the original body using the config explained above ?

To be honest, I don't really want to help you get the http output working, my gut feeling is that it is not the best approach, but anyways I don't know how.

I also see this is more complex than I originally thought (in general sense) so sorry for that.

Your previous model of sending the HTTP requests to the _bulk endpoint is and was fine, when talking to elasticsearch directly. You had sequence of index operations, and documents to index. Probably worked well.

But to try to insert logstash in the flow there, with HTTP input and output, isn't maybe as straightforward as you/I imagined. For a start the HTTP input, by default, adds a bunch of stuff, this you've seen already.

Here's the simplest possible (almost) pipeline

input { http { host => "0.0.0.0" port => "8080" } }
output { file { codec => rubydebug path => "/tmp/debug.txt" } }

If I issue the below curl command I can index directly to elasticsearch the 2 docs you shared earlier in thread.

$ cat sample.json
{ "index" : { "_index" : "new_index"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}
{ "index" : { "_index" : "new_index"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}

$ curl -s -XPOST http://localhost:9200/new_index/_bulk -H 'Content-Type: application/x-ndjson' --data-binary @sample.json | jq -c .
{"errors":false,"took":0,"items":[{"index":{"_index":"new_index","_id":"mo8YgpUBN4ZhZTUv5Pj-","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}},{"index":{"_index":"new_index","_id":"m48YgpUBN4ZhZTUv5Pj-","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":3,"_primary_term":1,"status":201}}]}

If I execute exactly same command, except to logstash on port 8080, I get this in the output file:

{
       "headers" => {
            "http_accept" => "*/*",
           "request_path" => "/new_index/_bulk",
         "request_method" => "POST",
              "http_host" => "localhost:8080",
           "http_version" => "HTTP/1.1",
        "http_user_agent" => "curl/7.58.0",
         "content_length" => "761",
           "content_type" => "application/x-ndjson"
    },
       "message" => "{ \"index\" : { \"_index\" : \"new_index\"}}\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"16\",\"STOREATTRIBUTE3\":\"Passed Value\",\"DATETIME\":\"2025-03-07T19:14:58.400\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLSECURITY\":{\"SID\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Gandalf\",\"LASTNAME\":\"the Grey\"}}\n{ \"index\" : { \"_index\" : \"new_index\"}}\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"17\",\"DRAWERIDENT\":\"test\",\"DATETIME\":\"2025-03-07T19:14:58.400\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE3\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLTRANSACTIONATTRIBUTES\":{\"INVOICENUMBER\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Gandalf\",\"LASTNAME\":\"the Grey\"}}\n",
    "@timestamp" => 2025-03-10T22:05:11.515323139Z,
          "host" => "127.0.0.1",
      "@version" => "1"
}

The output from the input stage is the input for the output stage, it works a bit like a unix pipe. So your document, and the index operation commands, are both stored into the intermediate documents message field. So you are going to need pull that out, decide if you want any of the other fields. You are also going to have to do something not dissimilar for the elasticsearch output too, see the post from @Badger above.

A poor mans' approach is below - the config works, in sense I will define in a second:

input {
  http {
    host => "0.0.0.0"
    port => "8080"
  }
}
filter {
  prune {
    interpolate => true
    whitelist_names => ["message"]
  }
  json {
    source => "message"
    remove_field => "message"
  }
}
output {
  file {
    codec => rubydebug
    path => "/tmp/debug.txt"
  }
  elasticsearch {
    hosts=>["localhost:9200"]
    index=>"new_index"
  }
}

So that just takes each HTTP request on port8080, assumes (sort of) that its a single json document on a single line, extracts that line from message into its original fields, removes message, and indexes into Elasticsearch's new_index index.

So if I index the 2 documents, on lines 2 and line 4, of your sample above via:

$ sed -n 4p sample.json | curl -s -XPOST http://localhost:8080 -H 'Content-Type: application/x-ndjson' --data-binary @- && echo
ok
$ sed -n 2p sample.json | curl -s -XPOST http://localhost:8080 -H 'Content-Type: application/x-ndjson' --data-binary @- && echo
ok

then both those docs are indexed correctly into that index in elasticsearch

$ curl -s  http://localhost:9200/new_index/_search | jq -c '.hits.hits[]'
{"_index":"new_index","_id":"yI9SgpUBN4ZhZTUvXvgn","_score":1,"_source":{"STOREATTRIBUTE2":"StoreDB Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE5":"StoreDB Value","EVENTID":"17","CHAINCODE":"8971","STOREATTRIBUTE3":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"DRAWERIDENT":"test","REMOTEIP":"1.111.1.11","FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"},"STOREATTRIBUTE4":"StoreDB Value"}}
{"_index":"new_index","_id":"yY9SgpUBN4ZhZTUvbPgW","_score":1,"_source":{"STOREATTRIBUTE2":"StoreDB Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE5":"StoreDB Value","EVENTID":"16","CHAINCODE":"8971","STOREATTRIBUTE3":"Passed Value","FLSECURITY":{"SID":"1111"},"REMOTEIP":"1.111.1.11","FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"},"STOREATTRIBUTE4":"StoreDB Value"}}

Is that really elegant? No, not at all. Is it tested? No, not at all.

Others can likely take a similar (or better) approach to munge the data from HTTP input to HTTP or Elasticsearch output, being able to handle multiple documents/lines at a time.

Good morning All,
As you are suggesting, I am changing my Delphi app to write a request to Logstash in a format that allows the elasticsearch output plugin to write my documents natively, instead of creating this bypass.

But just to give you a feedback on the progress on my "wrong solution", I made it work and the only problem was the "format" parameter that needed to be set as "format => message". If you are interested, if it can be useful, I can post here the config.

sure, post the working input http --> output http config, it might help someone else.

And I think you made the right call - logstash's elasticsearch output uses the _bulk endpoint by default.