Forwarding my bulk records from Http Input Plugin to Elasticsearch Output Plugin - Discuss the Elastic Stack

Hello there.

Passing from Elastic On-Prem to Elastic Cloud, I am trying to use Logstash to create a "store and forward" implementation for creating documents in elastic. So, practically, before I was directly writing to Elastic, now I want to use Logstash to collect my calls and execute them taking advantage of the persistent_queue function and don't lose data in case of internet outage, letting Logstash waiting for the connection to be established again, instead of failing.

Before, I was successfully sending POST requests to "http://myelasticinstance:9200/_bulk/" + my "body" (that contains index and data) and writing it to Elastic.

{ "index" : { "_index" : "journaling_insert"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-10T14:36:35.165","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Francesco","LASTNAME":"Esposito"}}
{ "index" : { "_index" : "journaling_insert"}}
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-10T14:36:35.165","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Sam","LASTNAME":"Stein"}}

Now, I want Logstash to receive my http call using the input http plugin and use elasticsearch output plugin to insert my documents in elastic.

If can help, I am also able to modify my request in the following way:

{ "journaling_insert": [
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}},
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}} 
]}

Where my jornaling_insert is also the index that I need to write to.

I cannot format my Logstash config file to filters my data in a way that elasticsearch output plugin is able to understand. I don't receive any error back from elastic in console or some error log in elasticsearch log folder, so I have some issue telling you what the problem is.

By the way, sending one record at the time from postman, I was able to insert index correctly using the config file:

input {
	http {
		port => 5043
	}
}
filter {
	mutate {
		remove_field => "headers"
		remove_field => "host"
	}
}
output {
    file {
        path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }
	elasticsearch {
		hosts => "localhost:9200"
		index => "journaling_insert"
	}
}

The problem is that this will force all my document in a specific index, and I cannot allow that, my document could have different destinations.

Summarizing, the question is: How can I format my input message in a way that make sense for elasticsearch output plugin to ingest my bulk records or how to use the filter to modify my message and make it compatible with elasticsearch output plugin?

Thanks in advance.

I thought I answered this in the other thread. Here.

As mentioned in your other post, if you were sending your events to Elasticsearch and now want to send it to Logstash, you should change the format of your message.

From what you shared it seems that you have control on the source format, can you change it in a way that it is easier for logstash to work?

For example, instead of sending this:

{ "journaling_insert": [
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}},
{"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}} 
]}

Can you send something like this?

[ 
    {"index_name": "journaling_insert", "payload": {"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}},
    {"index_name": "journaling_insert", "payload": {"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}} 
]

The second one would be way easier to parse for Logstash.

Do you mean that your index name can change? Because in the example you only shared one index destination, if the message format can change and the index needs to be different you would need to share more examples as this can change the answer or tips you get.

Thank you Leandro for the answer.

Yes, I can change the request as I want to facilitate the Logstash config file.

Yes, formatting in your way, the index could change in the same request in this way:

[ 
    {"index_name": "virtualdoc_insert", "payload": {"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}},
    {"index_name": "journaling_insert", "payload": {"REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}}} 
]

I have created it with your example so don't be concerned about the data.

Do you think it can work? Do I have a limitation about writing to more than one index? What would be the config file for Logstash with your format?

If writing to multiple index in one time is a problem or a complication, I can rethink the way we send information to Logstash.

@Badger thank you for your answer and sorry if I didn't see your answer.

I will give it a shot in a minute, and I will update you.

In the meantime, I would like to simplify the config file, for being easier to maintain in the future, because I have the possibility to change my request.
So, if you have ideas that would allow me to simplify the Logstash config, changing my request, it would be great!

If you send that payload to Logstash using a http input with the json codec, Logstash will automatically parse your message.

It will generate one event per item in the bulk request you made, for example if you send an array with 100 json objects, you will have 100 events.

During the parse a field named index_name will be create, you can then use this field in your Elasticsearch output to send the events to the correct index.

Something like this in your elasticsearch output.

index => "%{index_name}"

How my filter {} should be?

input {
	http {
		port => 5043
		codec => "json"
	}
}
output {
    file {
        path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }
	elasticsearch {
		hosts => "localhost:9200"
		index => "%{index_name}"
	}
}

Something like this?

If you do not want to make any changes your log you do not need a filter block, it is not required, only input and output are required.

This pipeline should work, you need to test it.

Using the configuration above, logstash give me a 200 OK result and my output file is:

{"@version":"1","headers":{"postman_token":"187ae4b0-3faa-4b55-9e51-83d73eec1a8d","connection":"keep-alive","request_path":"/_bulk","cache_control":"no-cache","accept_encoding":"gzip","request_method":"POST","http_accept":"*/*","content_length":"1349","http_host":"localhost:5043","http_version":"HTTP/1.1","content_type":"application/json","http_user_agent":"PostmanRuntime/7.43.0"},"host":"0:0:0:0:0:0:0:1","payload":{"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"},"REMOTEIP":"1.111.1.11","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE3":"Passed Value","FLSECURITY":{"SID":"1111"},"CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","DATETIME":"2025-03-07T19:14:58.400"},"index_name":"journaling_insert","@timestamp":"2025-03-14T14:23:35.560Z"}
{"@version":"1","headers":{"postman_token":"187ae4b0-3faa-4b55-9e51-83d73eec1a8d","connection":"keep-alive","request_path":"/_bulk","cache_control":"no-cache","accept_encoding":"gzip","request_method":"POST","http_accept":"*/*","content_length":"1349","http_host":"localhost:5043","http_version":"HTTP/1.1","content_type":"application/json","http_user_agent":"PostmanRuntime/7.43.0"},"host":"0:0:0:0:0:0:0:1","payload":{"FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"REMOTEIP":"1.111.1.11","DRAWERIDENT":"test","STOREATTRIBUTE3":"StoreDB Value","EVENTID":"17","STOREATTRIBUTE5":"StoreDB Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE4":"StoreDB Value","FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"},"CHAINCODE":"8971","STOREATTRIBUTE2":"StoreDB Value"},"index_name":"journaling_insert","@timestamp":"2025-03-14T14:23:35.560Z"}

But I have nothing in elasticsearch, not even an error... do you have a suggestion to see what happened?

This is the logstash stdout:

D:\logstash-7.17.28\bin>logstash.bat -f D:\logstash-7.17.28\config\http-logstash.conf
"Using bundled JDK: D:\logstash-7.17.28\jdk\bin\java.exe"
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Sending Logstash logs to D:/logstash-7.17.28/logs which is now configured via log4j2.properties
[2025-03-14T10:16:50,815][INFO ][logstash.runner          ] Log4j configuration path used is: D:\logstash-7.17.28\config\log4j2.properties
[2025-03-14T10:16:50,824][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.17.28", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.26+4 on 11.0.26+4 +indy +jit [mswin32-x86_64]"}
[2025-03-14T10:16:50,826][INFO ][logstash.runner          ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djdk.io.File.enableADS=true, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true]
[2025-03-14T10:16:50,896][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2025-03-14T10:16:54,578][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2025-03-14T10:16:55,379][INFO ][org.reflections.Reflections] Reflections took 58 ms to scan 1 urls, producing 119 keys and 419 values
[2025-03-14T10:16:56,815][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2025-03-14T10:16:57,132][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2025-03-14T10:16:57,388][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2025-03-14T10:16:57,401][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch version determined (7.9.1) {:es_version=>7}
[2025-03-14T10:16:57,403][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2025-03-14T10:16:57,489][INFO ][logstash.outputs.elasticsearch][main] Config is not compliant with data streams. `data_stream => auto` resolved to `false`
[2025-03-14T10:16:57,491][INFO ][logstash.outputs.elasticsearch][main] Config is not compliant with data streams. `data_stream => auto` resolved to `false`
[2025-03-14T10:16:57,537][INFO ][logstash.outputs.elasticsearch][main] Using a default mapping template {:es_version=>7, :ecs_compatibility=>:disabled}
[2025-03-14T10:16:57,568][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["D:/logstash-7.17.28/config/http-logstash.conf"], :thread=>"#<Thread:0x3c2e3955 run>"}
[2025-03-14T10:16:58,542][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.97}
[2025-03-14T10:16:58,707][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2025-03-14T10:16:58,719][INFO ][logstash.inputs.http     ][main][c9075852d40cd7134eeb7e714bccac6c3c491ccb056b6ae774e9edb935a5e984] Starting http input listener {:address=>"0.0.0.0:5043", :ssl=>"false"}
[2025-03-14T10:16:58,781][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2025-03-14T10:18:27,524][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Opening file {:path=>"D:/log_streaming/my_app/log-2025-03-14_14.18.27.335.log"}
[2025-03-14T10:18:27,540][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Opening file {:path=>"D:/log_streaming/my_app/log-2025-03-14_14.18.27.336.log"}
[2025-03-14T10:18:48,769][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Closing file D:/log_streaming/my_app/log-2025-03-14_14.18.27.335.log
[2025-03-14T10:18:48,775][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Closing file D:/log_streaming/my_app/log-2025-03-14_14.18.27.336.log
[2025-03-14T10:19:27,597][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Opening file {:path=>"D:/log_streaming/my_app/log-2025-03-14_14.19.27.480.log"}
[2025-03-14T10:19:48,752][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Closing file D:/log_streaming/my_app/log-2025-03-14_14.19.27.480.log
[2025-03-14T10:22:07,419][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Opening file {:path=>"D:/log_streaming/my_app/log-2025-03-14_14.22.07.287.log"}
[2025-03-14T10:22:28,738][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Closing file D:/log_streaming/my_app/log-2025-03-14_14.22.07.287.log
[2025-03-14T10:23:35,678][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Opening file {:path=>"D:/log_streaming/my_app/log-2025-03-14_14.23.35.560.log"}
[2025-03-14T10:23:53,744][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Closing file D:/log_streaming/my_app/log-2025-03-14_14.23.35.560.log
[2025-03-14T10:28:26,216][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Opening file {:path=>"D:/log_streaming/my_app/log-2025-03-14_14.28.26.082.log"}
[2025-03-14T10:28:48,758][INFO ][logstash.outputs.file    ][main][c62641ea1d78f09d32e6b17aba1b9738f1718c769b45f81bd6e9ffd08e156c85] Closing file D:/log_streaming/my_app/log-2025-03-14_14.28.26.082.log

How did you validate this? Do you have Kibana?

Can you run GET _cat/indices?v in Kibana Dev Tools.

There is no error in your logstash logs, so it should be sent to elasticsearch without any issues.

Correct, I have Kibana. I don't see any new index or journaling_* indexes changing after my request.

If I send this request to elastic, it doesn't work as well:

{
    "error": {
        "root_cause": [
            {
                "type": "illegal_argument_exception",
                "reason": "Malformed action/metadata line [1], expected START_OBJECT but found [START_ARRAY]"
            }
        ],
        "type": "illegal_argument_exception",
        "reason": "Malformed action/metadata line [1], expected START_OBJECT but found [START_ARRAY]"
    },
    "status": 400
}

Please share the result of GET _cat/indices?v as asked.

There is nothing wrong in the configuration, and since there is no error in your Logstash logs, we need to see what you have in Elasticsearch.

I tried assist in the other thread, sorry that I cannot help there, but one thing I noticed belatedly was your versions:

elasticsearch - 7.9.1
logstash - 7.17.28

I did all my own tests using 8.16/8,17, and the code from @Badger in that other thread seemed to work for me. YMMV. I'm not saying those versions are a problem, but it is another possible variable. I also notice here Windows paths, so there is also maybe complications with how Windows and Unix handle EOLs ('/r/n' vs '/n') though @Badger 's code did seem to try to handle that.

Okay, I got what is happening. There is a kind of delay inserting and in Kibana the records don't show up because it is inserting in my index but in the wrong way. Executing:

GET journaling-000002/_search
{
  "size": 3000, 
  "query": {
    "match_all": {}
  }
}

The latest 2 records inserted are:

{
  "took" : 10,
  "timed_out" : false,
  "_shards" : {
    "total" : 4,
    "successful" : 4,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 304,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [

.......

           {
                "_index": "journaling-000002",
                "_type": "_doc",
                "_id": "M4gOlZUBxgG-V5f-b5f7",
                "_score": 1.0,
                "_source": {
                    "@version": "1",
                    "headers": {
                        "postman_token": "ba0670d7-41e2-4701-bb99-47bbde9aa08c",
                        "connection": "keep-alive",
                        "request_path": "/_bulk",
                        "cache_control": "no-cache",
                        "accept_encoding": "gzip",
                        "request_method": "POST",
                        "http_accept": "*/*",
                        "content_length": "1349",
                        "http_host": "localhost:5043",
                        "http_version": "HTTP/1.1",
                        "content_type": "application/json",
                        "http_user_agent": "PostmanRuntime/7.43.0"
                    },
                    "host": "0:0:0:0:0:0:0:1",
                    "payload": {
                        "FLCUSTOMER": {
                            "FIRSTNAME": "Gandalf",
                            "LASTNAME": "the Grey"
                        },
                        "REMOTEIP": "1.111.1.11",
                        "STOREATTRIBUTE4": "StoreDB Value",
                        "STOREATTRIBUTE3": "Passed Value",
                        "FLSECURITY": {
                            "SID": "1111"
                        },
                        "CHAINCODE": "8971",
                        "EVENTID": "16",
                        "STOREATTRIBUTE2": "StoreDB Value",
                        "STOREATTRIBUTE5": "StoreDB Value",
                        "DATETIME": "2025-03-07T19:14:58.400"
                    },
                    "index_name": "journaling_insert",
                    "@timestamp": "2025-03-14T14:28:26.082Z"
                }
            },
            {
                "_index": "journaling-000002",
                "_type": "_doc",
                "_id": "24gnlZUBxgG-V5f-nJkN",
                "_score": 1.0,
                "_source": {
                    "@version": "1",
                    "headers": {
                        "postman_token": "7f2cf3d8-ea1e-403e-9e51-bd3194cff09c",
                        "connection": "keep-alive",
                        "request_path": "/_bulk",
                        "cache_control": "no-cache",
                        "accept_encoding": "gzip",
                        "request_method": "POST",
                        "http_accept": "*/*",
                        "content_length": "1349",
                        "http_host": "localhost:5043",
                        "http_version": "HTTP/1.1",
                        "content_type": "application/json",
                        "http_user_agent": "PostmanRuntime/7.43.0"
                    },
                    "host": "0:0:0:0:0:0:0:1",
                    "payload": {
                        "FLTRANSACTIONATTRIBUTES": {
                            "INVOICENUMBER": "1111"
                        },
                        "REMOTEIP": "1.111.1.11",
                        "DRAWERIDENT": "test",
                        "STOREATTRIBUTE3": "StoreDB Value",
                        "EVENTID": "17",
                        "STOREATTRIBUTE5": "StoreDB Value",
                        "DATETIME": "2025-03-07T19:14:58.400",
                        "STOREATTRIBUTE4": "StoreDB Value",
                        "FLCUSTOMER": {
                            "FIRSTNAME": "Gandalf",
                            "LASTNAME": "the Grey"
                        },
                        "CHAINCODE": "8971",
                        "STOREATTRIBUTE2": "StoreDB Value"
                    },
                    "index_name": "journaling_insert",
                    "@timestamp": "2025-03-14T14:55:55.787Z"
                }
            }
        ]
    }
]

What do you think?
I think it is not understanding to use only the payload fields to populate the index.

Thank you Kevin for answering.
Yes, I am using:
elasticsearch - 7.9.1
logstash - 7.17.28
The final destination in production will be version 8.x in the Cloud, but, at this moment, I cannot use it. Unfortunately, my local version that I was provided is this one.
I had to use Logstash 7.x because 8.x is not compatible with elastic 7.x. :frowning:

This is not correct, the index name is wrong.

With this output:

	elasticsearch {
		hosts => "localhost:9200"
		index => "%{index_name}"
	}

The index name is extracted from the field index_name, I don't see how this pipeline would index data on a index named journaling-000002, are you sure that this is the only output configured?

Also, can you please run the command asked to show which indices do you have?

Yes, it's correct Leandro. The journaling_insert is an alias to trigger automatic rollovers and I can see that Logstash is correctly using "%{index_name}" fields. The problem is the content of the request that is the entire json incoming.

The records don't show up in Kibana because the DATETIME field is not populated and the Kibana query uses by default the DateTime filter.

Did you see my 2 latest records?

@leandrojmp, I was forgetting the result of the request GET _cat/indices?v:

health status index                          uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   journaling-000002              vkpFQQ4qSKKLhpRBpMhuZA   4   1        306            0    310.1kb        310.1kb
green  open   .apm-custom-link               Byg46ZVtSmOTITVEr88bDA   1   0          0            0       208b           208b
green  open   .kibana_task_manager_1         MmJ2fyBMT0mxEy7A3q57ag   1   0          6          616    426.3kb        426.3kb
green  open   .kibana-event-log-7.9.1-000004 dekjkxFWTGm_4RO6ADR4sA   1   0          2            0     10.9kb         10.9kb
yellow open   journaling-000001              Hmip23O8Rn2hVRqa3uuGKA   4   1         38            0      219kb          219kb
green  open   .apm-agent-configuration       -FHuKxhkT_KJnU0nhb5kuw   1   0          0            0       208b           208b
green  open   .kibana-event-log-7.9.1-000006 3lugYqLxRZ6VH9sCFKEUMw   1   0          4            0     21.5kb         21.5kb
green  open   .kibana-event-log-7.9.1-000005 6VtOXMpwQie7aVBsqjnkJQ   1   0          3            0     16.2kb         16.2kb
green  open   .async-search                  Ou6JTegdSk6GE5DgeSxgKg   1   0          0            0      3.3kb          3.3kb
green  open   .kibana_1                      OxHSpNOhS_OJRTfRNSrqWg   1   0        513           57     10.4mb         10.4mb

So journaling_insert is the write alias for your index? Not sure if I missed it or you didn't mention it.

Yes, this is how logstash works, it will send the entire event, if you do not want some fields you need to use a remove filter.

If I understand correctly you want to send just this to your Elasticsearch:

  {
    "REMOTEIP": "1.111.1.11",
    "CHAINCODE": "8971",
    "EVENTID": "17",
    "DRAWERIDENT": "test",
    "DATETIME": "2025-03-07T19:14:58.400",
    "STOREATTRIBUTE2": "StoreDB Value",
    "STOREATTRIBUTE3": "StoreDB Value",
    "STOREATTRIBUTE4": "StoreDB Value",
    "STOREATTRIBUTE5": "StoreDB Value",
    "FLTRANSACTIONATTRIBUTES": {
      "INVOICENUMBER": "1111"
    },
    "FLCUSTOMER": {
      "FIRSTNAME": "Gandalf",
      "LASTNAME": "the Grey"
    }
  }

So, you woud again need to change your message to something like this:

[ 
    {"index_name": "virtualdoc_insert", "REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}},
    {"index_name": "journaling_insert", "REMOTEIP":"1.111.1.11","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-07T19:14:58.400","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Gandalf","LASTNAME":"the Grey"}} 
]

Just moved all the keys to the root of the document, instead of putting them nested into another key named payload.

Then you should use a pipeline like this:

input {
	http {
		port => 5043
		codec => "json"
	}
}
filter {
  mutate {
    add_field => {
      "[@metadata][index]" => "%{index_name}"
    }
  }
  mutate {
    remove_field => ["index_name", "host", "headers", "@version"]
  }
}
output {
    file {
        path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }
	elasticsearch {
		hosts => "localhost:9200"
		index => "%{[@metadata][index]}"
	}
}

This basically add a @metadata field with the value of the index name, this field can be used in the output filters, but will not be present in the final document.

Then you have another mutate to remove the undesired fields.

You need to keep the @timestamp field.