logstash: 7.17.9
cfg file:
input {
elasticsearch {
hosts => [""]
index => "new_index_001"
docinfo => true
scroll => "30s"
size => 500
filter {
mutate {
remove_field => ["@timestamp","@version"]
#output {
# elasticsearch {
# hosts => ["http://localhost:9200"]
# index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
# #user => "elastic"
# #password => "changeme"
# }
output {
kafka {
bootstrap_servers => ",,"
topic_id => "es613-input-01"
codec => "json"
when i was startup logstash, it's shutdown self shortly.
command line:
logstash -f es6_to_kafka.cfg --path.data /home/logstash/logstash-7.17.9/input.es6/
[2023-07-09T12:13:11,000][INFO ][org.apache.kafka.clients.producer.ProducerConfig][main] ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [,,]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 50
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[2023-07-09T12:13:11,010][DEBUG][org.apache.kafka.clients.CommonClientConfigs][main] Disabling exponential reconnect backoff because reconnect.backoff.ms is set, but reconnect.backoff.max.ms is not.
[2023-07-09T12:13:11,062][DEBUG][org.apache.kafka.clients.producer.internals.Sender][main] [Producer clientId=producer-1] Starting Kafka producer I/O thread.
[2023-07-09T12:13:11,064][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka version: 2.5.1
[2023-07-09T12:13:11,064][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka commitId: 0efa8fb0f4c73d92
[2023-07-09T12:13:11,064][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka startTimeMs: 1688875991062
[2023-07-09T12:13:11,066][DEBUG][org.apache.kafka.clients.producer.KafkaProducer][main] [Producer clientId=producer-1] Kafka producer started
[2023-07-09T12:13:11,067][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initialize connection to node (id: -1 rack: null) for sending metadata request
[2023-07-09T12:13:11,068][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating connection to node (id: -1 rack: null) using address /
[2023-07-09T12:13:11,078][DEBUG][org.apache.kafka.common.network.Selector][main] [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
[2023-07-09T12:13:11,193][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>100, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>200, "pipeline.sources"=>["/home/logstash/logstash-7.17.9/config/es6_to_kafka.cfg"], :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:11,301][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
[2023-07-09T12:13:11,301][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating API versions fetch from node -1.
[2023-07-09T12:13:11,444][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Recorded API versions for node -1
[2023-07-09T12:13:11,446][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node (id: -1 rack: null)
[2023-07-09T12:13:11,460][INFO ][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Cluster ID: 3Btp8pAGT_-JvqZv2XADIw
[2023-07-09T12:13:11,460][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='3Btp8pAGT_-JvqZv2XADIw', nodes={1= (id: 1 rack: null), 2= (id: 2 rack: null), 3= (id: 3 rack: null)}, partitions=[], controller= (id: 1 rack: null)}
[2023-07-09T12:13:11,923][DEBUG][org.logstash.config.ir.CompiledPipeline][main] Compiled filter
P[filter-mutate{"remove_field"=>["@timestamp", "@version"]}|[file]/home/logstash/logstash-7.17.9/config/es6_to_kafka.cfg:15:1:```
mutate {
remove_field => ["@timestamp","@version"]
[2023-07-09T12:13:12,254][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>1.05}
[2023-07-09T12:13:12,717][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2023-07-09T12:13:12,723][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2023-07-09T12:13:13,427][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2023-07-09T12:13:13,438][DEBUG][logstash.javapipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:13,440][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2023-07-09T12:13:13,489][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2023-07-09T12:13:13,696][DEBUG][logstash.inputs.elasticsearch][main][deab56f7bcac25a9f17f1aed5dc427063fec7dd1a76875aec773b517387fdf17] Closing {:plugin=>"LogStash::Inputs::Elasticsearch"}
[2023-07-09T12:13:13,701][DEBUG][logstash.pluginmetadata ][main][deab56f7bcac25a9f17f1aed5dc427063fec7dd1a76875aec773b517387fdf17] Removing metadata for plugin deab56f7bcac25a9f17f1aed5dc427063fec7dd1a76875aec773b517387fdf17
[2023-07-09T12:13:13,704][DEBUG][logstash.javapipeline ][main] Input plugins stopped! Will shutdown filter/output workers. {:pipeline_id=>"main", :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:13,712][DEBUG][logstash.javapipeline ][main] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<LogStash::WorkerLoopThread:0x6aee0225 run>"}
[2023-07-09T12:13:13,739][DEBUG][logstash.filters.mutate ][main][bdcfc005a46d9e9adc9977589c77d7915ee81b2203841c6e9074076a6203df22] filters/LogStash::Filters::Mutate: removing field {:field=>"@timestamp"}
[2023-07-09T12:13:13,832][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initialize connection to node (id: 2 rack: null) for sending metadata request
[2023-07-09T12:13:13,833][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating connection to node (id: 2 rack: null) using address /
[2023-07-09T12:13:13,835][DEBUG][org.apache.kafka.common.network.Selector][main] [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2
[2023-07-09T12:13:13,835][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Completed connection to node 2. Fetching API versions.
[2023-07-09T12:13:13,835][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating API versions fetch from node 2.
[2023-07-09T12:13:13,839][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Recorded API versions for node 2
[2023-07-09T12:13:13,839][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='es613-input-01')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node (id: 2 rack: null)
[2023-07-09T12:13:13,844][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updating last seen epoch for partition es613-input-01-0 from null to epoch 0 from new metadata
[2023-07-09T12:13:13,845][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updating last seen epoch for partition es613-input-01-3 from null to epoch 0 from new metadata
[2023-07-09T12:13:13,849][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updated cluster metadata updateVersion 3 to MetadataCache{clusterId='3Btp8pAGT_-JvqZv2XADIw', nodes={1= (id: 1 rack: null), 2= (id: 2 rack: null), 3= (id: 3 rack: null)}, partitions=[PartitionMetadata(, error=NONE, partition=es613-input-01-5, leader=Optional[3], leaderEpoch=Optional[0], replicas=3,1, isr=3,1, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-0, leader=Optional[1], leaderEpoch=Optional[0], replicas=1,3, isr=1,3, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-2, leader=Optional[3], leaderEpoch=Optional[0], replicas=3,2, isr=3,2, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-1, leader=Optional[2], leaderEpoch=Optional[0], replicas=2,1, isr=2,1, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-4, leader=Optional[2], leaderEpoch=Optional[0], replicas=2,3, isr=2,3, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-3, leader=Optional[1], leaderEpoch=Optional[0], replicas=1,2, isr=1,2, offlineReplicas=)], controller= (id: 1 rack: null)}
[2023-07-09T12:13:13,868][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating connection to node (id: 1 rack: null) using address /
[2023-07-09T12:13:13,870][DEBUG][org.apache.kafka.common.network.Selector][main] [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1
[2023-07-09T12:13:13,870][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Completed connection to node 1. Fetching API versions.
[2023-07-09T12:13:13,870][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating API versions fetch from node 1.
[2023-07-09T12:13:13,873][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Recorded API versions for node 1
[2023-07-09T12:13:13,947][DEBUG][logstash.javapipeline ][main] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<LogStash::WorkerLoopThread:0x584528e6 dead>"}
[2023-07-09T12:13:13,949][DEBUG][logstash.filters.mutate ][main] Closing {:plugin=>"LogStash::Filters::Mutate"}
[2023-07-09T12:13:13,950][DEBUG][logstash.pluginmetadata ][main] Removing metadata for plugin bdcfc005a46d9e9adc9977589c77d7915ee81b2203841c6e9074076a6203df22
[2023-07-09T12:13:13,951][DEBUG][logstash.outputs.kafka ][main] Closing {:plugin=>"LogStash::Outputs::Kafka"}
[2023-07-09T12:13:13,953][INFO ][org.apache.kafka.clients.producer.KafkaProducer][main] [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2023-07-09T12:13:13,953][DEBUG][org.apache.kafka.clients.producer.internals.Sender][main] [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
[2023-07-09T12:13:13,960][DEBUG][org.apache.kafka.clients.producer.internals.Sender][main] [Producer clientId=producer-1] Shutdown of Kafka producer I/O thread has completed.
[2023-07-09T12:13:13,962][DEBUG][org.apache.kafka.clients.producer.KafkaProducer][main] [Producer clientId=producer-1] Kafka producer has been closed
[2023-07-09T12:13:13,962][DEBUG][logstash.pluginmetadata ][main] Removing metadata for plugin 6d3ce71faa4075d3ba098a5d1ec2b8b3590d50460ad1978e43101e553a2a68c8
[2023-07-09T12:13:13,963][DEBUG][logstash.javapipeline ][main] Pipeline has been shutdown {:pipeline_id=>"main", :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:13,965][INFO ][logstash.javapipeline ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2023-07-09T12:13:14,023][DEBUG][logstash.agent ] Shutting down all pipelines {:pipelines_count=>0}
[2023-07-09T12:13:14,030][DEBUG][logstash.agent ] Converging pipelines state {:actions_count=>1}
[2023-07-09T12:13:14,034][DEBUG][logstash.agent ] Executing action {:action=>LogStash::PipelineAction::Delete/pipeline_id:main}
[2023-07-09T12:13:14,043][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2023-07-09T12:13:14,049][DEBUG][logstash.instrument.periodicpoller.os] Stopping
[2023-07-09T12:13:14,058][DEBUG][logstash.instrument.periodicpoller.jvm] Stopping
[2023-07-09T12:13:14,060][DEBUG][logstash.instrument.periodicpoller.persistentqueue] Stopping
[2023-07-09T12:13:14,060][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] Stopping
[2023-07-09T12:13:14,098][DEBUG][logstash.agent ] API WebServer has stopped running
[2023-07-09T12:13:14,099][INFO ][logstash.runner ] Logstash shut down.