input {
http {
port => 5043
}
}
filter {
mutate {
add_field => {
"full_url1" => "http://localhost:9200%{[headers][request_path]}"
}
add_field => {
"full_url2" => "https://anotherendpoint.com%{[headers][request_path]}"
}
add_field => {
"original_content_type" => "%{[headers][content_type]}"
}
add_field => {
"original_path" => "%{[headers][request_path]}"
}
add_field => {
"request_body" => "%{[message]}"
}
}
}
output {
file {
path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"
}
http {
http_method=>"post"
format => "message"
url => "%{[full_url1]}"
content_type => "application/json"
message => "%{[request_body]}"
}
http {
http_method=>"post"
format => "message"
url => "%{[full_url2]}"
content_type => "application/json"
message => "%{[request_body]}"
}
}
Excellent, yes, that seems to work well.
You should likely accept your own answer, and good luck with your project going forward.
@Badger, I am following the suggestion of moving away from http output plugin to use elasticsearch output plugin.
I have a request coming from http input plugin as explained:
{
"@timestamp": "2025-03-10T20:16:17.408Z",
"message": "{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"16\",\"STOREATTRIBUTE3\":\"Passed Value\",\"DATETIME\":\"2025-03-10T20:16:17.404\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLSECURITY\":{\"SID\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Francesco\",\"LASTNAME\":\"Esposito\"}}\r\n{ \"index\" : { \"_index\" : \"journaling_insert\"}}\r\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"17\",\"DRAWERIDENT\":\"test\",\"DATETIME\":\"2025-03-10T20:16:17.404\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE3\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLTRANSACTIONATTRIBUTES\":{\"INVOICENUMBER\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Sam\",\"LASTNAME\":\"Stein\"}}\r\n",
"host": "127.0.0.1",
"headers": {
"cache_control": "no-cache",
"http_version": "HTTP/1.1",
"connection": "Keep-Alive",
"request_method": "POST",
"http_user_agent": "Mozilla/4.0 (compatible; Clever Internet Suite)",
"content_type": "application/x-ndjson; charset=utf-8",
"request_path": "/_bulk",
"http_accept": "*/*",
"accept_encoding": "gzip",
"content_length": "776",
"http_host": "localhost:5043"
},
"@version": "1"
}
And I am not able to format the request in right way to feed the elasticsearch output plugin something like:
{ "index" : { "_index" : "journaling_insert"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-10T20:16:17.404","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Francesco","LASTNAME":"Esposito"}}
{ "index" : { "_index" : "journaling_insert"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-10T20:16:17.404","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Sam","LASTNAME":"Stein"}}
@RainTown I tried your approach, but it doesn't work generating this json, that I output to file:
{
"DRAWERIDENT": "test",
"STOREATTRIBUTE3": "StoreDB Value",
"@version": "1",
"FLTRANSACTIONATTRIBUTES": {
"INVOICENUMBER": "1111"
},
"DATETIME": "2025-03-11T20:33:51.690",
"REMOTEIP": "1.111.1.11",
"STOREATTRIBUTE5": "StoreDB Value",
"CHAINCODE": "8971",
"STOREATTRIBUTE4": "StoreDB Value",
"EVENTID": "17",
"@timestamp": "2025-03-11T20:33:51.877Z",
"STOREATTRIBUTE2": "StoreDB Value",
"FLCUSTOMER": {
"LASTNAME": "Stein",
"FIRSTNAME": "Sam"
}
}
Any suggestion?
I have made some changes to my application and now I am sending something like this:
{ "journaling_index": [
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}},
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}
]}
that generates:
{
"@timestamp": "2025-03-12T19:24:41.307Z",
"@version": "1",
"journaling_index": [
{
"FLSECURITY": {
"SID": "1111"
},
"STOREATTRIBUTE4": "StoreDB Value",
"REMOTEIP": "1.111.1.11",
"CHAINCODE": "8971",
"EVENTID": "16",
"STOREATTRIBUTE5": "StoreDB Value",
"STOREATTRIBUTE3": "Passed Value",
"DATETIME": "2025-03-07T19:14:58.400",
"FLCUSTOMER": {
"LASTNAME": "the Grey",
"FIRSTNAME": "Gandalf"
},
"STOREATTRIBUTE2": "StoreDB Value"
},
{
"STOREATTRIBUTE4": "StoreDB Value",
"CHAINCODE": "8971",
"FLTRANSACTIONATTRIBUTES": {
"INVOICENUMBER": "1111"
},
"STOREATTRIBUTE3": "StoreDB Value",
"DATETIME": "2025-03-07T19:14:58.400",
"DRAWERIDENT": "test",
"STOREATTRIBUTE2": "StoreDB Value",
"REMOTEIP": "1.111.1.11",
"EVENTID": "17",
"STOREATTRIBUTE5": "StoreDB Value",
"FLCUSTOMER": {
"LASTNAME": "the Grey",
"FIRSTNAME": "Gandalf"
}
}
]
}
Using the config:
input {
http {
port => 5043
}
}
filter {
mutate {
remove_field => "headers"
remove_field => "host"
}
json {
source => "message"
remove_field => "message"
}
}
output {
file {
path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"
}
elasticsearch {
hosts => "localhost:9200"
index => "journaling_insert"
}
}
I am still missing how I can use my journaling_insert as index and the different records as insert record. Could you guys support me in that?
Different elasticsearch endpoints expect different input. You probably dont need the
{ "index" : { "_index" : "journaling_insert"}}
parts, just the documents. But still, you need do something similar with the http input to push just the documents to the elasticsearch output.
If you change your Delphi application to POST one document at a time to the HTTP endpoint then something like this will work:
input {
http { host => "0.0.0.0" port => "8080" }
}
filter {
json {
source => "message" remove_field => "message" remove_field => "host" remove_field => "headers" remove_field => "@timestamp" remove_field => "@version"
}
}
output {
file { path => "/tmp/debug.txt" }
elasticsearch { hosts=>["localhost:9200"] index=>"new_index" }
}
NOTE: I am not claiming this is the optimal approach.
Lets go through the 3 phases
input: your document (singular) arrives the HTTP port, here port 8080, path does not matter. That received document is effectively stored into message
inside a larger document, alongside various other fields like HTTP headers, methods, etc.
filter: this part essentially extracts the original document from the field message
, and then throws away the message and rest of the fields.
output: this document is added to the (logstash internal) Q of docs to be indexed to elasticsearch. At some later time point (but within a very short time interval) elasticsearch will index all the docs in its current Q into the new_index
index. Note logstash itself uses the _bulk endpoint by default.
So I tested the above config this way - I have your original docs stored in a file, the actual docs are lines 2 and 4. So I send the, line 2 and then line 4, to the HTTP input (port 8080 in my case) and they get stored into the index new_index
.
$ cat sample.json
{ "index" : { "_index" : "new_index"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}
{ "index" : { "_index" : "new_index"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}
$ sed -n 2p sample.json | curl -s http://localhost:8080 --data-binary @- && echo $?
ok0
$ sed -n 4p sample.json | curl -s http://localhost:8080 --data-binary @- && echo $?
ok0
$ curl -s http://localhost:9200/new_index/_search | jq -Sc ".hits.hits[]._source"
{"CHAINCODE":"8971","DATETIME":"2025-03-07T19:14:58.400","EVENTID":"16","FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"},"FLSECURITY":{"SID":"1111"},"REMOTEIP":"1.111.1.11","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"Passed Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value"}
{"CHAINCODE":"8971","DATETIME":"2025-03-07T19:14:58.400","DRAWERIDENT":"test","EVENTID":"17","FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"},"FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"REMOTEIP":"1.111.1.11","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value"}
I am sure there is a not very complex way to handle multiple documents at once, i.e. sending multiple docs to HTTP endpoint at same time, but ... I didn't have time to figure that out.
For the original data format the following might work
mutate { gsub => [ "message", "\r", "" ] }
mutate { split => { "message" => "
" } }
ruby {
code => '
msg = event.get("message")
if msg.is_a? Array
while msg.length > 1 do
clone = event.clone
clone.set("message", msg.shift(2))
new_event_block.call(clone)
end
if msg.length > 0
logger.warn("Content is uneven")
event.set("message", msg)
else
event.cancel
end
end
'
}
if [message][1] {
json { source => "[message][1]" }
json { source => "[message][0]" target => "[@metadata][operation]" }
}
and in the output section
if [@metadata][index] {
elasticsearch {
index => "%{[@metadata][operation][index][_index]}"
....
As noted before, this doesn't handle create, update, etc.
@Badger , what do you think about the new way I am testing, with a node journaling_insert that is named as the index I want to send the data to and containing the the data itself?
Could this make a easier to build configuration? Could you help me again?
Sorry to bother you again in creating this configuration, trying to learn.
@RainTown unfortunately, I cannot send one document at the time. My operation needs to be atomic. I was able to work with one document simply sending the json formatted new document and add the index = journaling_insert in the elasticsearch output. But, unfortunately, that is not usable because the only specification is that the insert needs to be in bulk of all the needed record.
someone more knowledgeable than I will need to confirm if logstash guarantees this (atomicity) and, if so, in which scenarios (and specifically if the HTTP input is used). Also in context of persistent queues.
Plus I didn't think elasticsearch itself guarantees atomicity anyways, so ...
I know and, really really, thank you for all the support, you have been great helping me getting me started.
About this problem, I am just thinking that if I was able to do it thru http output, I could be able to do it thru elasticsearch output, more if bulk is the standard api used. Let's hope!
@Badger , what do you think about the new way I am testing, with a node journaling_insert that is named as the index I want to send the data to and containing the the data itself?
Could this make a easier to build configuration? Could you help me again?
Sorry to bother you again in creating this configuration, trying to learn.
I am closing this thread with a specific one to solve my problem using http input and elasticsearch output.
Thats fine. But can I just remind you that you wrote:
and I replied
To quote from an older thread here:
All items of a bulk request are indexed individually, so failures will just affect parts of the bulk request. The success or failure of individual documents is reported in the response. You could therefore have e.g. 3 updates to the same document in a bulk request and have only the second one fail. There is no transactions or atomicity.
This is nothing to do with your http/elasticsearch/logstash issue reported here. But unless I'm wrong or misunderstanding what you wrote, a lack of guaranteed atomicity is likely something you should consider how to mitigate. Not for this thread, but ... you maybe have a model based on a incorrect assumption. You probably don't want to find out the downsides of that the hard way.
Good luck with your project.
Just to clarify, what I meant as "atomic" is that I need to send records to Logstash in one shot. When I have received the 200 OK response, I am done with my job, after that Logstash will need to take care of my packet.