Logstash not picking/not processing all files with elasticsearch output plugin

Hello again. I am testing Logstash options and I am now creating this configuration: I am using 2 pipelines, one creates files with this configuration:

input {
	http {
		port => 6043
	}
}
output {
    file {
        path => "/log_streaming/my_app/records/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
		flush_interval => 0
		stale_cleanup_interval => 1
		codec => line { format => "%{message}" }
    }
}

that creates files like:

{ "_id": "B9600FD9-C21D-4DFA-B5A8-9F43F1B39326", "REMOTEIP":"1.111.1.11","USERAGENT":"PostmanRuntime\/7.43.2","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-25T19:10:10.195","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Francesco","LASTNAME":"Esposito"}}
{ "_id": "A0EE58D7-4C04-469C-A11A-5B0A68210284", "REMOTEIP":"1.111.1.11","USERAGENT":"PostmanRuntime\/7.43.2","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-25T19:10:10.195","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Sam","LASTNAME":"Stein"}}

Then I have a second pipeline configured to elaborate every line as a single event to be able to pass in the document_id parameter (this is the best way I thought it could work):

input {
	file {
		path => "D:/log_streaming/my_app/records/*.log"
		start_position => "beginning"
		sincedb_path => "NUL"
		mode => "read"
		file_completed_action => "delete"
	}
}
filter {
    json {
        source => "message"
    }
	mutate {
		rename => { "_id" => "[@metadata][_id]" }
		remove_field => "path"
		remove_field => "host"
		remove_field => "message"
		remove_field => "@version"
	}
}
output {
    file {
        path => "/log_streaming/my_app/logelastic/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }
	elasticsearch {
		hosts => "http://localhost:9200"
		index => "journaling_insert"
		document_id => "%{[@metadata][_id]}"
		doc_as_upsert => "true"
	}
}

Having the two pipeline working alternatively, creating the file first and then activating the second one to process the files, every line is insert correctly.
The problem starts when I make those two pipeline working simultaneously.
Looks like not all the lines are picked up (maybe because the files are started to be read before they are closed?).
Do you have any explanation why this is happening and suggestions in reading the files correctly when two pipelines are working simultaneously?

This mode is used when the file is already written and will not receive any updates, which does not seem to be your case.

If your files are going to receive new lines you need to use the mode => "tail" or just remove the mode setting as this is already the default.

You also need to remove the setting file_completed_action which is not used in tail mode, and add the setting start_position => "beginning"

@leandrojmp technically, the file is written in one shot and not updated after the first write. I have another Logstash configuration using http output and it works perfectly:

input {
	file {
		path => "D:/log_streaming/my_app/records/*.log"
		start_position => "beginning"
		sincedb_path => "NUL"
		mode => "read"
		file_completed_action => "delete"
		codec => multiline { pattern => "^saddasdasdasdasdas" negate => true what => "previous"}
	}
}
filter {
	mutate {
		add_field => { 
			"full_url1" => "http://localhost:9200/_bulk" 
		} 
		add_field => { 
			"full_url2" => "https://esposito.free.beeceptor.com/_bulk" 
		} 
		add_field => { 
			"request_body" => "%{[message]}
"
		}
	}
}
output {
    file {
        path => "/log_streaming/my_app/loghttp/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }
	http {
		http_method=>"post"
		format => "message"
		url => "%{[full_url1]}"
		content_type => "application/json"
		message => "%{[request_body]}"
	}
}	

and uses this type of file:

{ "index" : { "_index" : "journaling_insert", "_id" : "351F6C43-4786-4483-9963-D260BDEF0303"}}
{"REMOTEIP":"1.111.1.11","USERAGENT":"PostmanRuntime\/7.43.2","CHAINCODE":"8971","EVENTID":"16","STOREATTRIBUTE3":"Passed Value","DATETIME":"2025-03-24T23:01:02.298","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Francesco","LASTNAME":"Esposito"}}
{ "index" : { "_index" : "journaling_insert", "_id" : "DCC4A163-33A1-4AA0-AC95-D26CE51DA014"}}
{"REMOTEIP":"1.111.1.11","USERAGENT":"PostmanRuntime\/7.43.2","CHAINCODE":"8971","EVENTID":"17","DRAWERIDENT":"test","DATETIME":"2025-03-24T23:01:02.298","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"FLCUSTOMER":{"FIRSTNAME":"Sam","LASTNAME":"Stein"}}

Because obviously http _bulk needs the _id passed in the index document.

This I am experiencing is just a weird behavior. If you still think that the problem is the "tail" mode, I will try that.

The file is the output of a pipeline that has an http input and then read by another pipeline with the file input in the read mode.

It is possible that the file input will read the file while it is still not finished, and then it will not read again.

The read mode works on files where the content is already completed when Logstash starts reading it, this is not your case.

You need to use the tail mode so logstash will get new lines, but in tail mode logstash cannot delete the file, so you will need to delete using another tool.

Also, you seem to be on windows, is log_streaming a network share? Network shares also can be problematic.

Thank you for the answer @leandrojmp .
I am studying all the possibilities that Logstash has, and the folder log_streaming is just a local folder on my pc.

So, do you think that it's just pure luck that in the http output plugin example all documents are sent correctly?

I will try using tail mode, but I will also verify if there is an interval in discovering new files, so the system can have the time to write them or Logstash needs time to elaborate the other files chronologically before the latest files are written on the disk.