Logstash automatic shutdown normal

logstash: 7.17.9
cfg file:

input {
  elasticsearch {
    hosts => ["10.251.0.11:39202"]
    index => "new_index_001"
    docinfo => true
    scroll => "30s"
    size => 500
  }
}

filter {
mutate {
  remove_field => ["@timestamp","@version"]
}
}

#output {
#  elasticsearch {
#    hosts => ["http://localhost:9200"]
#    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#    #user => "elastic"
#    #password => "changeme"
#  }
#}

output {
  kafka {
    bootstrap_servers => "10.10.1.13:19092,10.10.1.12:19092,10.10.1.11:19092"
    topic_id => "es613-input-01"
    codec => "json"
  }
}

when i was startup logstash, it's shutdown self shortly.

command line:
logstash -f es6_to_kafka.cfg --path.data /home/logstash/logstash-7.17.9/input.es6/

logfile:

[2023-07-09T12:13:11,000][INFO ][org.apache.kafka.clients.producer.ProducerConfig][main] ProducerConfig values: 
        acks = 1
        batch.size = 16384
        bootstrap.servers = [10.10.1.13:19092, 10.10.1.12:19092, 10.10.1.11:19092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 50
        reconnect.backoff.ms = 50
        request.timeout.ms = 40000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[2023-07-09T12:13:11,010][DEBUG][org.apache.kafka.clients.CommonClientConfigs][main] Disabling exponential reconnect backoff because reconnect.backoff.ms is set, but reconnect.backoff.max.ms is not.
[2023-07-09T12:13:11,062][DEBUG][org.apache.kafka.clients.producer.internals.Sender][main] [Producer clientId=producer-1] Starting Kafka producer I/O thread.
[2023-07-09T12:13:11,064][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka version: 2.5.1
[2023-07-09T12:13:11,064][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka commitId: 0efa8fb0f4c73d92
[2023-07-09T12:13:11,064][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka startTimeMs: 1688875991062
[2023-07-09T12:13:11,066][DEBUG][org.apache.kafka.clients.producer.KafkaProducer][main] [Producer clientId=producer-1] Kafka producer started
[2023-07-09T12:13:11,067][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initialize connection to node 10.10.1.13:19092 (id: -1 rack: null) for sending metadata request
[2023-07-09T12:13:11,068][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating connection to node 10.10.1.13:19092 (id: -1 rack: null) using address /10.10.1.13
[2023-07-09T12:13:11,078][DEBUG][org.apache.kafka.common.network.Selector][main] [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
[2023-07-09T12:13:11,193][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>100, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>200, "pipeline.sources"=>["/home/logstash/logstash-7.17.9/config/es6_to_kafka.cfg"], :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:11,301][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
[2023-07-09T12:13:11,301][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating API versions fetch from node -1.
[2023-07-09T12:13:11,444][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Recorded API versions for node -1: (Produce(0): 0 to 9 [usable: 8], Fetch(1): 0 to 13 [usable: 11], ListOffsets(2): 0 to 7 [usable: 5], Metadata(3): 0 to 12 [usable: 9], LeaderAndIsr(4): 0 to 5 [usable: 4], StopReplica(5): 0 to 3 [usable: 2], UpdateMetadata(6): 0 to 7 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 5], DeleteTopics(20): 0 to 6 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 1], InitProducerId(22): 0 to 4 [usable: 3], OffsetForLeaderEpoch(23): 0 to 4 [usable: 3], AddPartitionsToTxn(24): 0 to 3 [usable: 1], AddOffsetsToTxn(25): 0 to 3 [usable: 1], EndTxn(26): 0 to 3 [usable: 1], WriteTxnMarkers(27): 0 to 1 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 2], AlterConfigs(33): 0 to 2 [usable: 1], AlterReplicaLogDirs(34): 0 to 2 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], UNKNOWN(48): 0 to 1, UNKNOWN(49): 0 to 1, UNKNOWN(50): 0, UNKNOWN(51): 0, UNKNOWN(56): 0, UNKNOWN(57): 0, UNKNOWN(60): 0, UNKNOWN(61): 0, UNKNOWN(65): 0, UNKNOWN(66): 0, UNKNOWN(67): 0)
[2023-07-09T12:13:11,446][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.10.1.13:19092 (id: -1 rack: null)
[2023-07-09T12:13:11,460][INFO ][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Cluster ID: 3Btp8pAGT_-JvqZv2XADIw
[2023-07-09T12:13:11,460][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='3Btp8pAGT_-JvqZv2XADIw', nodes={1=10.10.1.13:19092 (id: 1 rack: null), 2=10.10.1.12:19092 (id: 2 rack: null), 3=10.10.1.11:19092 (id: 3 rack: null)}, partitions=[], controller=10.10.1.13:19092 (id: 1 rack: null)}
[2023-07-09T12:13:11,923][DEBUG][org.logstash.config.ir.CompiledPipeline][main] Compiled filter
 P[filter-mutate{"remove_field"=>["@timestamp", "@version"]}|[file]/home/logstash/logstash-7.17.9/config/es6_to_kafka.cfg:15:1:```
mutate {
  remove_field => ["@timestamp","@version"]
}
```] 
 into 
 org.logstash.config.ir.compiler.ComputeStepSyntaxElement@53e5cb12
....
[2023-07-09T12:13:12,254][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>1.05}
[2023-07-09T12:13:12,717][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2023-07-09T12:13:12,723][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2023-07-09T12:13:13,427][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2023-07-09T12:13:13,438][DEBUG][logstash.javapipeline    ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:13,440][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2023-07-09T12:13:13,489][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2023-07-09T12:13:13,696][DEBUG][logstash.inputs.elasticsearch][main][deab56f7bcac25a9f17f1aed5dc427063fec7dd1a76875aec773b517387fdf17] Closing {:plugin=>"LogStash::Inputs::Elasticsearch"}
[2023-07-09T12:13:13,701][DEBUG][logstash.pluginmetadata  ][main][deab56f7bcac25a9f17f1aed5dc427063fec7dd1a76875aec773b517387fdf17] Removing metadata for plugin deab56f7bcac25a9f17f1aed5dc427063fec7dd1a76875aec773b517387fdf17
[2023-07-09T12:13:13,704][DEBUG][logstash.javapipeline    ][main] Input plugins stopped! Will shutdown filter/output workers. {:pipeline_id=>"main", :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:13,712][DEBUG][logstash.javapipeline    ][main] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<LogStash::WorkerLoopThread:0x6aee0225 run>"}
[2023-07-09T12:13:13,739][DEBUG][logstash.filters.mutate  ][main][bdcfc005a46d9e9adc9977589c77d7915ee81b2203841c6e9074076a6203df22] filters/LogStash::Filters::Mutate: removing field {:field=>"@timestamp"}
。。。。
[2023-07-09T12:13:13,832][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initialize connection to node 10.10.1.12:19092 (id: 2 rack: null) for sending metadata request
[2023-07-09T12:13:13,833][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating connection to node 10.10.1.12:19092 (id: 2 rack: null) using address /10.10.1.12
[2023-07-09T12:13:13,835][DEBUG][org.apache.kafka.common.network.Selector][main] [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2
[2023-07-09T12:13:13,835][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Completed connection to node 2. Fetching API versions.
[2023-07-09T12:13:13,835][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating API versions fetch from node 2.
[2023-07-09T12:13:13,839][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Recorded API versions for node 2: (Produce(0): 0 to 9 [usable: 8], Fetch(1): 0 to 13 [usable: 11], ListOffsets(2): 0 to 7 [usable: 5], Metadata(3): 0 to 12 [usable: 9], LeaderAndIsr(4): 0 to 5 [usable: 4], StopReplica(5): 0 to 3 [usable: 2], UpdateMetadata(6): 0 to 7 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 5], DeleteTopics(20): 0 to 6 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 1], InitProducerId(22): 0 to 4 [usable: 3], OffsetForLeaderEpoch(23): 0 to 4 [usable: 3], AddPartitionsToTxn(24): 0 to 3 [usable: 1], AddOffsetsToTxn(25): 0 to 3 [usable: 1], EndTxn(26): 0 to 3 [usable: 1], WriteTxnMarkers(27): 0 to 1 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 2], AlterConfigs(33): 0 to 2 [usable: 1], AlterReplicaLogDirs(34): 0 to 2 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], UNKNOWN(48): 0 to 1, UNKNOWN(49): 0 to 1, UNKNOWN(50): 0, UNKNOWN(51): 0, UNKNOWN(56): 0, UNKNOWN(57): 0, UNKNOWN(60): 0, UNKNOWN(61): 0, UNKNOWN(65): 0, UNKNOWN(66): 0, UNKNOWN(67): 0)
[2023-07-09T12:13:13,839][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='es613-input-01')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.10.1.12:19092 (id: 2 rack: null)
[2023-07-09T12:13:13,844][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updating last seen epoch for partition es613-input-01-0 from null to epoch 0 from new metadata
。。。
[2023-07-09T12:13:13,845][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updating last seen epoch for partition es613-input-01-3 from null to epoch 0 from new metadata
[2023-07-09T12:13:13,849][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updated cluster metadata updateVersion 3 to MetadataCache{clusterId='3Btp8pAGT_-JvqZv2XADIw', nodes={1=10.10.1.13:19092 (id: 1 rack: null), 2=10.10.1.12:19092 (id: 2 rack: null), 3=10.10.1.11:19092 (id: 3 rack: null)}, partitions=[PartitionMetadata(, error=NONE, partition=es613-input-01-5, leader=Optional[3], leaderEpoch=Optional[0], replicas=3,1, isr=3,1, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-0, leader=Optional[1], leaderEpoch=Optional[0], replicas=1,3, isr=1,3, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-2, leader=Optional[3], leaderEpoch=Optional[0], replicas=3,2, isr=3,2, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-1, leader=Optional[2], leaderEpoch=Optional[0], replicas=2,1, isr=2,1, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-4, leader=Optional[2], leaderEpoch=Optional[0], replicas=2,3, isr=2,3, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-3, leader=Optional[1], leaderEpoch=Optional[0], replicas=1,2, isr=1,2, offlineReplicas=)], controller=10.10.1.13:19092 (id: 1 rack: null)}
[2023-07-09T12:13:13,868][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating connection to node 10.10.1.13:19092 (id: 1 rack: null) using address /10.10.1.13
[2023-07-09T12:13:13,870][DEBUG][org.apache.kafka.common.network.Selector][main] [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1
[2023-07-09T12:13:13,870][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Completed connection to node 1. Fetching API versions.
[2023-07-09T12:13:13,870][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating API versions fetch from node 1.
[2023-07-09T12:13:13,873][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Recorded API versions for node 1: (Produce(0): 0 to 9 [usable: 8], Fetch(1): 0 to 13 [usable: 11], ListOffsets(2): 0 to 7 [usable: 5], Metadata(3): 0 to 12 [usable: 9], LeaderAndIsr(4): 0 to 5 [usable: 4], StopReplica(5): 0 to 3 [usable: 2], UpdateMetadata(6): 0 to 7 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 5], DeleteTopics(20): 0 to 6 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 1], InitProducerId(22): 0 to 4 [usable: 3], OffsetForLeaderEpoch(23): 0 to 4 [usable: 3], AddPartitionsToTxn(24): 0 to 3 [usable: 1], AddOffsetsToTxn(25): 0 to 3 [usable: 1], EndTxn(26): 0 to 3 [usable: 1], WriteTxnMarkers(27): 0 to 1 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 2], AlterConfigs(33): 0 to 2 [usable: 1], AlterReplicaLogDirs(34): 0 to 2 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], UNKNOWN(48): 0 to 1, UNKNOWN(49): 0 to 1, UNKNOWN(50): 0, UNKNOWN(51): 0, UNKNOWN(56): 0, UNKNOWN(57): 0, UNKNOWN(60): 0, UNKNOWN(61): 0, UNKNOWN(65): 0, UNKNOWN(66): 0, UNKNOWN(67): 0)
[2023-07-09T12:13:13,947][DEBUG][logstash.javapipeline    ][main] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<LogStash::WorkerLoopThread:0x584528e6 dead>"}
[2023-07-09T12:13:13,949][DEBUG][logstash.filters.mutate  ][main] Closing {:plugin=>"LogStash::Filters::Mutate"}
[2023-07-09T12:13:13,950][DEBUG][logstash.pluginmetadata  ][main] Removing metadata for plugin bdcfc005a46d9e9adc9977589c77d7915ee81b2203841c6e9074076a6203df22
[2023-07-09T12:13:13,951][DEBUG][logstash.outputs.kafka   ][main] Closing {:plugin=>"LogStash::Outputs::Kafka"}
[2023-07-09T12:13:13,953][INFO ][org.apache.kafka.clients.producer.KafkaProducer][main] [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2023-07-09T12:13:13,953][DEBUG][org.apache.kafka.clients.producer.internals.Sender][main] [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
[2023-07-09T12:13:13,960][DEBUG][org.apache.kafka.clients.producer.internals.Sender][main] [Producer clientId=producer-1] Shutdown of Kafka producer I/O thread has completed.
[2023-07-09T12:13:13,962][DEBUG][org.apache.kafka.clients.producer.KafkaProducer][main] [Producer clientId=producer-1] Kafka producer has been closed
[2023-07-09T12:13:13,962][DEBUG][logstash.pluginmetadata  ][main] Removing metadata for plugin 6d3ce71faa4075d3ba098a5d1ec2b8b3590d50460ad1978e43101e553a2a68c8
[2023-07-09T12:13:13,963][DEBUG][logstash.javapipeline    ][main] Pipeline has been shutdown {:pipeline_id=>"main", :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:13,965][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2023-07-09T12:13:14,023][DEBUG][logstash.agent           ] Shutting down all pipelines {:pipelines_count=>0}
[2023-07-09T12:13:14,030][DEBUG][logstash.agent           ] Converging pipelines state {:actions_count=>1}
[2023-07-09T12:13:14,034][DEBUG][logstash.agent           ] Executing action {:action=>LogStash::PipelineAction::Delete/pipeline_id:main}
[2023-07-09T12:13:14,043][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2023-07-09T12:13:14,049][DEBUG][logstash.instrument.periodicpoller.os] Stopping
[2023-07-09T12:13:14,058][DEBUG][logstash.instrument.periodicpoller.jvm] Stopping
[2023-07-09T12:13:14,060][DEBUG][logstash.instrument.periodicpoller.persistentqueue] Stopping
[2023-07-09T12:13:14,060][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] Stopping
[2023-07-09T12:13:14,098][DEBUG][logstash.agent           ] API WebServer has stopped running
[2023-07-09T12:13:14,099][INFO ][logstash.runner          ] Logstash shut down.

I think you are missing schedule:

input {
  elasticsearch {
    hosts => ["10.251.0.11:39202"]
    index => "new_index_001"
    docinfo => true
    scroll => "30s"
    size => 500
    schedule => "10 * * * *" # run every 10 sec
  }
}

The documentation:

There is no schedule by default. If no schedule is given, then the statement is run exactly once.

thanks,but didn't work.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.