To be honest, I don't really want to help you get the http output working, my gut feeling is that it is not the best approach, but anyways I don't know how.
I also see this is more complex than I originally thought (in general sense) so sorry for that.
Your previous model of sending the HTTP requests to the _bulk endpoint is and was fine, when talking to elasticsearch directly. You had sequence of index operations, and documents to index. Probably worked well.
But to try to insert logstash in the flow there, with HTTP input and output, isn't maybe as straightforward as you/I imagined. For a start the HTTP input, by default, adds a bunch of stuff, this you've seen already.
Here's the simplest possible (almost) pipeline
input { http { host => "0.0.0.0" port => "8080" } }
output { file { codec => rubydebug path => "/tmp/debug.txt" } }
If I issue the below curl command I can index directly to elasticsearch the 2 docs you shared earlier in thread.
$ cat sample.json
{ "index" : { "_index" : "new_index"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}
{ "index" : { "_index" : "new_index"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}
$ curl -s -XPOST http://localhost:9200/new_index/_bulk -H 'Content-Type: application/x-ndjson' --data-binary @sample.json | jq -c .
{"errors":false,"took":0,"items":[{"index":{"_index":"new_index","_id":"mo8YgpUBN4ZhZTUv5Pj-","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}},{"index":{"_index":"new_index","_id":"m48YgpUBN4ZhZTUv5Pj-","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":3,"_primary_term":1,"status":201}}]}
If I execute exactly same command, except to logstash on port 8080, I get this in the output file:
{
"headers" => {
"http_accept" => "*/*",
"request_path" => "/new_index/_bulk",
"request_method" => "POST",
"http_host" => "localhost:8080",
"http_version" => "HTTP/1.1",
"http_user_agent" => "curl/7.58.0",
"content_length" => "761",
"content_type" => "application/x-ndjson"
},
"message" => "{ \"index\" : { \"_index\" : \"new_index\"}}\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"16\",\"STOREATTRIBUTE3\":\"Passed Value\",\"DATETIME\":\"2025-03-07T19:14:58.400\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLSECURITY\":{\"SID\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Gandalf\",\"LASTNAME\":\"the Grey\"}}\n{ \"index\" : { \"_index\" : \"new_index\"}}\n{\"REMOTEIP\":\"1.111.1.11\",\"CHAINCODE\":\"8971\",\"EVENTID\":\"17\",\"DRAWERIDENT\":\"test\",\"DATETIME\":\"2025-03-07T19:14:58.400\",\"STOREATTRIBUTE2\":\"StoreDB Value\",\"STOREATTRIBUTE3\":\"StoreDB Value\",\"STOREATTRIBUTE4\":\"StoreDB Value\",\"STOREATTRIBUTE5\":\"StoreDB Value\",\"FLTRANSACTIONATTRIBUTES\":{\"INVOICENUMBER\":\"1111\"},\"FLCUSTOMER\":{\"FIRSTNAME\":\"Gandalf\",\"LASTNAME\":\"the Grey\"}}\n",
"@timestamp" => 2025-03-10T22:05:11.515323139Z,
"host" => "127.0.0.1",
"@version" => "1"
}
The output from the input stage is the input for the output stage, it works a bit like a unix pipe. So your document, and the index operation commands, are both stored into the intermediate documents message field. So you are going to need pull that out, decide if you want any of the other fields. You are also going to have to do something not dissimilar for the elasticsearch output too, see the post from @Badger above.
A poor mans' approach is below - the config works, in sense I will define in a second:
input {
http {
host => "0.0.0.0"
port => "8080"
}
}
filter {
prune {
interpolate => true
whitelist_names => ["message"]
}
json {
source => "message"
remove_field => "message"
}
}
output {
file {
codec => rubydebug
path => "/tmp/debug.txt"
}
elasticsearch {
hosts=>["localhost:9200"]
index=>"new_index"
}
}
So that just takes each HTTP request on port8080, assumes (sort of) that its a single json document on a single line, extracts that line from message
into its original fields, removes message, and indexes into Elasticsearch's new_index index.
So if I index the 2 documents, on lines 2 and line 4, of your sample above via:
$ sed -n 4p sample.json | curl -s -XPOST http://localhost:8080 -H 'Content-Type: application/x-ndjson' --data-binary @- && echo
ok
$ sed -n 2p sample.json | curl -s -XPOST http://localhost:8080 -H 'Content-Type: application/x-ndjson' --data-binary @- && echo
ok
then both those docs are indexed correctly into that index in elasticsearch
$ curl -s http://localhost:9200/new_index/_search | jq -c '.hits.hits[]'
{"_index":"new_index","_id":"yI9SgpUBN4ZhZTUvXvgn","_score":1,"_source":{"STOREATTRIBUTE2":"StoreDB Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE5":"StoreDB Value","EVENTID":"17","CHAINCODE":"8971","STOREATTRIBUTE3":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"DRAWERIDENT":"test","REMOTEIP":"1.111.1.11","FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"},"STOREATTRIBUTE4":"StoreDB Value"}}
{"_index":"new_index","_id":"yY9SgpUBN4ZhZTUvbPgW","_score":1,"_source":{"STOREATTRIBUTE2":"StoreDB Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE5":"StoreDB Value","EVENTID":"16","CHAINCODE":"8971","STOREATTRIBUTE3":"Passed Value","FLSECURITY":{"SID":"1111"},"REMOTEIP":"1.111.1.11","FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"},"STOREATTRIBUTE4":"StoreDB Value"}}
Is that really elegant? No, not at all. Is it tested? No, not at all.
Others can likely take a similar (or better) approach to munge the data from HTTP input to HTTP or Elasticsearch output, being able to handle multiple documents/lines at a time.