I have written a spring boot application which starts the Logstash process internally by forming a command to get the conf file and trigger the command, similarly when i am trying to stop it via my application code the shutdown api call does not work,even using Process.destroy(), Process.destroyForcibly() cannot shutdown logstash completely.
Can anyone please suggest a way to stop Logstash via application code.
I do not see any documentation of a logstash shutdown API, so a 404 NotFound seems reasonable.
There is an elasticsearch shutdown API which does not affect logstash. It's also intended for use by other products and not supported for random folks like you and me. I don't even think it terminates the elasticsearch node, it just tells the node (using PUT) to stop doing any work so that it can be safely terminated (when polling with a GET indicates it is idle).
As I understand it, Process.destroyForcibly() is a Java function. It should work if you have the right Process object that you need to destroy, but that depends on all the code you are using to start and keep track of the logstash subprocess. That's not going to be a logstash question.
@Badger, Let me then rephrase my question, is there any way we can stop the logstash pipeline, via any API or something, or there is no way to do that and i have only to kill the process itself.
What i want is that Logstash pipeline should go down and should not take more logs to elasticSearch, the ProcessBuilder class of java which i used to destroy the LogStash Process is also not able to stop it. here is the code snippet written which starts the process and tries to stop it if any ERROR or FATAL logs appear:
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Service
public class PushLogService {
@Value("${logstash.dir.path}")
private String logstashDirPath;
@Value("${logstash.conf.file.path}")
private String configPath;
private static final Logger logger = LoggerFactory.getLogger(PushLogService.class);
private Process logstashProcess;
public int runLogStashProcess() throws InterruptedException{
if (logstashProcess != null && logstashProcess.isAlive()) {
logger.info("Logstash is already running.");
return 1;
}
String logstashCommand = logstashDirPath + " -f " + configPath;
try {
ProcessBuilder processBuilder = new ProcessBuilder(logstashCommand.split(" "));
processBuilder.redirectErrorStream(true);
logstashProcess = processBuilder.start();
// Capture Logstash output in a separate thread
Executors.newSingleThreadExecutor().submit(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(logstashProcess.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
//System.out.println(line);
logger.info(line);
// Look for potential startup failure messages
if (line.contains("ERROR") || line.contains("FATAL")) {
logger.error("Logstash encountered an error during startup: {}", line);
stopLogstashProcess();
}
}
} catch (IOException | InterruptedException e) {
logger.error("Error reading Logstash process output", e);
}
});
// Executors.newSingleThreadExecutor().submit(() -> {
// try {
// int exitCode = logstashProcess.waitFor();
// if (exitCode != 0) {
// logger.error("Logstash process terminated with exit code {}", exitCode);
// }
// } catch (InterruptedException e) {
// logger.error("Logstash process was interrupted", e);
// }
// });
logger.info("Logstash process started with config file: {}", configPath);
return 0;
}catch (IOException e) {
logger.error("Error starting Logstash", e);
return 2;
}
}
private void stopLogstashProcess() throws InterruptedException {
if (logstashProcess != null && logstashProcess.isAlive()) {
logstashProcess.destroy();
logger.info("Attempting to stop Logstash gracefully");
int exitCode = logstashProcess.waitFor();
if (exitCode != 0) {
logger.error("Logstash process terminated with exit code {}", exitCode);
logstashProcess.destroyForcibly();
logger.info("Logstash stopped forcibly");
}
}
}
}
But both destroy() and destroyForcibly() are not able to do so?Any suggestions or comments will be helpful
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.