Hi Everyone, I hope you could help me. I need to set up Logstash reading as input from an AWS S3 bucket and sending as output to OpenSearch. The input is a folder containing many csv.
The issue is that when Logstash is triggered and arrive to "pipeline started" (as you can see from screen) it remains stuck forever.
Here I attach my configuration.
input {
s3 {
bucket => "my-bucket"
region => "eu-west-1"
prefix => "test_logstash/"
codec => plain {
charset => "ISO-8859-1"
}
}
}
output {
opensearch {
hosts => "opensearch_HOST:443"
user => "my-user"
password => "my-pwd"
index => "logstash-logs-%{+YYYY.MM.dd}"
ecs_compatibility => disabled
ssl_certificate_verification => false
}
}
From Logstash seems that it is connecting successfully to S3 and to OpenSearch. Here it is what I get completely before to stuck.
Using bundled JDK: /root/logstash-7.16.2/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Sending Logstash logs to /root/logstash-7.16.2/logs which is now configured via log4j2.properties
[2022-12-01T17:25:24,912][INFO ][logstash.runner ] Log4j configuration path used is: /root/logstash-7.16.2/config/log4j2.properties
[2022-12-01T17:25:24,932][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.16.2", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.13+8 on 11.0.13+8 +indy +jit [linux-x86_64]"}
[2022-12-01T17:25:25,631][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2022-12-01T17:25:27,177][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2022-12-01T17:25:28,758][INFO ][org.reflections.Reflections] Reflections took 172 ms to scan 1 urls, producing 119 keys and 417 values
[2022-12-01T17:25:55,702][INFO ][logstash.outputs.opensearch][main] New OpenSearch output {:class=>"LogStash::Outputs::OpenSearch", :hosts=>["OPENSEARCHHOST:443"]}
[2022-12-01T17:25:55,761][WARN ][logstash.outputs.opensearch][main] ** WARNING ** Detected UNSAFE options in opensearch output configuration!
** WARNING ** You have enabled encryption but DISABLED certificate verification.
** WARNING ** To make sure your data is secure change :ssl_certificate_verification to true
[2022-12-01T17:25:56,489][INFO ][logstash.outputs.opensearch][main] OpenSearch pool URLs updated {:changes=>{:removed=>[], :added=>[OPENSEARCHHOST:443/]}}
[2022-12-01T17:25:57,364][WARN ][logstash.outputs.opensearch][main] Restored connection to OpenSearch instance {:url=>"OPENSEARCHHOST:443/"}
[2022-12-01T17:25:57,484][INFO ][logstash.outputs.opensearch][main] Cluster version determined (7.10.2) {:version=>7}
[2022-12-01T17:25:57,664][INFO ][logstash.outputs.opensearch][main] Using a default mapping template {:version=>7, :ecs_compatibility=>:disabled}
[2022-12-01T17:25:57,751][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, "pipeline.sources"=>["/root/logstash-7.16.2/config/logstash.conf"], :thread=>"#<Thread:0x30578bb4 run>"}
[2022-12-01T17:25:59,304][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>1.55}
[2022-12-01T17:25:59,334][INFO ][logstash.inputs.s3 ][main] Registering {:bucket=>"MYBUCKET", :region=>"eu-west-1"}
[2022-12-01T17:26:00,564][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2022-12-01T17:26:00,685][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2022-12-01T17:26:04,433][INFO ][logstash.inputs.s3 ][main][5edf7936d84baf43ecb3ae4563b20d4d19480a230ba643cd2d19dd66eae2bf29] Using default generated file for the sincedb {:filename=>"/root/logstash-7.16.2/data/plugins/inputs/s3/sincedb_9bd4bb61d9cbaeaa1526f686b91e2669"}
Could you please help me? What is wrong here? It is the first time that I use Logstash.
Another question, should I set something on OpenSearch to prepare to receive data?
Looking forward for someone to help me.
Thank you a lot.
added part of debug level logs:
org.logstash.config.ir.compiler.ComputeStepSyntaxElement@3fdc2b9c
[2022-12-02T13:10:50,702][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>1.78}
[2022-12-02T13:10:50,742][INFO ][logstash.inputs.s3 ][main] Registering {:bucket=>"MY-BUCKET", :region=>"eu-west-1"}
[2022-12-02T13:10:51,986][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2022-12-02T13:10:51,999][DEBUG][logstash.javapipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x131789a run>"}
[2022-12-02T13:10:52,034][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:10:52,205][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2022-12-02T13:10:53,861][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:10:53,867][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:10:55,230][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Found key {:key=>"test_logstash/"}
[2022-12-02T13:10:55,241][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Ignoring {:key=>"test_logstash/"}
[2022-12-02T13:10:55,242][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Found key {:key=>"test_logstash/logstash_test.csv"}
[2022-12-02T13:10:55,249][INFO ][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Using the provided sincedb_path {:sincedb_path=>"/dev/null"}
[2022-12-02T13:10:55,268][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Added to objects[] {:key=>"test_logstash/logstash_test.csv", :length=>1}
[2022-12-02T13:10:55,302][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Processing {:bucket=>"MY-BUCKET", :key=>"test_logstash/logstash_test.csv"}
[2022-12-02T13:10:55,318][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Downloading remote file {:remote_key=>"test_logstash/logstash_test.csv", :local_filename=>"/tmp/logstash/logstash_test.csv"}
[2022-12-02T13:10:55,699][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Processing file {:filename=>"/tmp/logstash/logstash_test.csv"}
[2022-12-02T13:10:55,945][DEBUG][logstash.outputs.opensearch][main][7fcc73fd5d929902f33e89b90a86349660a8f4d5d9b9301e9817b24d6159030b] Sending final bulk request for batch. {:action_count=>1, :payload_size=>161, :content_length=>161, :batch_offset=>0}
[2022-12-02T13:10:57,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:10:58,877][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:10:58,886][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:02,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:11:03,893][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:11:03,899][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:07,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:11:08,905][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:11:08,915][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:12,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:11:13,922][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:11:13,928][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:17,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:11:18,933][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:11:18,939][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:22,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:11:23,946][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:11:23,947][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:27,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:11:28,958][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:11:28,958][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:32,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:11:33,970][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:11:33,984][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:37,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:11:38,990][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:11:38,994][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:42,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:11:44,001][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:11:44,002][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:47,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:11:49,012][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:11:49,014][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:52,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2022-12-02T13:11:53,001][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Found key {:key=>"test_logstash/"}
[2022-12-02T13:11:53,002][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Ignoring {:key=>"test_logstash/"}
[2022-12-02T13:11:53,004][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Found key {:key=>"test_logstash/logstash_test.csv"}
[2022-12-02T13:11:53,008][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Added to objects[] {:key=>"test_logstash/logstash_test.csv", :length=>1}
[2022-12-02T13:11:53,019][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Processing {:bucket=>"MY-BUCKET", :key=>"test_logstash/logstash_test.csv"}
[2022-12-02T13:11:53,021][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Downloading remote file {:remote_key=>"test_logstash/logstash_test.csv", :local_filename=>"/tmp/logstash/logstash_test.csv"}
[2022-12-02T13:11:53,242][DEBUG][logstash.inputs.s3 ][main][28cc012d9b766ac5ffb68a68729f31668a5ed4a8116843251e4a46032fa2c210] Processing file {:filename=>"/tmp/logstash/logstash_test.csv"}
[2022-12-02T13:11:53,350][DEBUG][logstash.outputs.opensearch][main][7fcc73fd5d929902f33e89b90a86349660a8f4d5d9b9301e9817b24d6159030b] Sending final bulk request for batch. {:action_count=>1, :payload_size=>161, :content_length=>161, :batch_offset=>0}
[2022-12-02T13:11:54,025][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2022-12-02T13:11:54,035][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2022-12-02T13:11:57,028][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
It seems that is trying to send to OpenSearch, but why is not finishing?