Hello there.
Passing from Elastic On-Prem to Elastic Cloud, I am trying to use Logstash to create a "store and forward" implementation for creating documents in elastic. So, practically, before I was directly writing to Elastic, now I want to use Logstash to collect my calls and execute them taking advantage of the persistent_queue function and don't lose data in case of internet outage, letting Logstash waiting for the connection to be established again, instead of failing.
Before, I was successfully sending POST requests to "http://myelasticinstance:9200/_bulk/" + my "body" (that contains index and data) and writing it to Elastic.
{ "index" : { "_index" : "journaling_insert"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-10T14:36:35.165","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Francesco","LASTNAME":"Esposito"}}
{ "index" : { "_index" : "journaling_insert"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-10T14:36:35.165","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Sam","LASTNAME":"Stein"}}
Now, I want Logstash to receive my http call using the input http plugin and use elasticsearch output plugin to insert my documents in elastic.
If can help, I am also able to modify my request in the following way:
{ "journaling_insert": [
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}},
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}
]}
Where my jornaling_insert is also the index that I need to write to.
I cannot format my Logstash config file to filters my data in a way that elasticsearch output plugin is able to understand. I don't receive any error back from elastic in console or some error log in elasticsearch log folder, so I have some issue telling you what the problem is.
By the way, sending one record at the time from postman, I was able to insert index correctly using the config file:
input {
http {
port => 5043
}
}
filter {
mutate {
remove_field => "headers"
remove_field => "host"
}
}
output {
file {
path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"
}
elasticsearch {
hosts => "localhost:9200"
index => "journaling_insert"
}
}
The problem is that this will force all my document in a specific index, and I cannot allow that, my document could have different destinations.
Summarizing, the question is: How can I format my input message in a way that make sense for elasticsearch output plugin to ingest my bulk records or how to use the filter to modify my message and make it compatible with elasticsearch output plugin?
Thanks in advance.