Logstash webhdfs output plugin AlreadyBeingCreatedException

Hello, I have created a webhdfc logstash configuaration file which will take file input and insert the data in hdfs through webhdfs output. sharing you the config file below.

input {
  file {
    path => "C:/Users/user/Desktop/test.csv"
	start_position => "beginning"
	sincedb_path => "NULL"
  }
}
filter {
 csv{
 separator => ","
 columns => ["ID","Name","Description","Url","Keywords","Weight"]
 }
}
output {
  webhdfs {
    host => "{my-ip}"            
    port => 9870               
     path => "/test/logstash/dt=%{+YYYY-MM-dd}/logstash-%{+HH}.csv" 
    user => "USER123"         
  }
  stdout { codec => rubydebug }
}

At first, it run and only 1 row of csv gets stored in the path with the following error.

[2022-11-01T11:53:57,171][WARN ][logstash.outputs.webhdfs ][main] webhdfs write caused an exception: 
{"RemoteException":{"exception":"AlreadyBeingCreatedException",
"javaClassName":"org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException",
"message":"Failed to APPEND_FILE /rakhshanda/temp123.csv for DFSClient_NONMAPREDUCE_-1932640012_83 
on 127.0.0.1 because this file lease is currently owned by DFSClient_NONMAPREDUCE_1092404658_79
 on 127.0.0.1\r\n\tat org.apache.hadoop.hdfs.server.namenode.
FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2706)\r\n\tat 
org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124)\r\n\tat
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2741)\r\n\tat 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:835)\r\n\tat 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:503)\r\n\tat 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)\r\n\tat 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)\r\n\tat 
org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)\r\n\tat org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)\r\n\tat 
org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)\r\n\tat java.security.AccessController.doPrivileged(Native Method)\r\n\tat 
javax.security.auth.Subject.doAs(Subject.java:422)\r\n\tat org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)\r\n\tat 
org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)\r\n"}}. Maybe you should increase retry_interval or reduce number of workers. Retrying...

after that when I run logstash it gets hanged and do nothing. pipeline started successfully but after that it did nothing. sharing you the screenshot.

Have you tried to reduce worker process?
pipeline.workers: 1

yes. nothing happens. Also I tried by creating a file in hdfs and then write into that file... pipeline is not workinh it gets hanged.

Have you restarted LS after setting the value?

yes I restarted the hdfs as well as logstash. same problem

I just type logstash like this...it is giving this error

The pipeline.yml does not have correct params, review it.

That will persist the in-memory sincedb in a file called NULL in the working directory of logstash. If you do not want it persisted across restarts then set sincedb_path => "NUL",

I have re-run the config today, and the file gets created with only 1 row in it followed by this exception

this is my logstash.yml

# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
#
# Or as flat keys:
#
#   pipeline.batch.size: 125
#   pipeline.batch.delay: 5
#
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#
# node.name: test
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
# path.data:
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
 pipeline.workers: 1
#
# How many events to retrieve from inputs before sending to filters+workers
#
# pipeline.batch.size: 125
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
# pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: Enabling this can lead to data loss during shutdown
#
# pipeline.unsafe_shutdown: false
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".
# "auto" automatically enables ordering if the 'pipeline.workers' setting
# is also set to '1', and disables otherwise.
# "true" enforces ordering on the pipeline and prevent logstash from starting
# if there are multiple workers.
# "false" disables any extra processing necessary for preserving ordering.
#
# pipeline.ordered: auto
#
# Sets the pipeline's default value for `ecs_compatibility`, a setting that is
# available to plugins that implement an ECS Compatibility mode for use with
# the Elastic Common Schema.
# Possible values are:
# - disabled
# - v1
# - v8 (default)
# Pipelines defined before Logstash 8 operated without ECS in mind. To ensure a
# migrated pipeline continues to operate as it did before your upgrade, opt-OUT
# of ECS for the individual pipeline in its `pipelines.yml` definition. Setting
# it here will set the default for _all_ pipelines, including new ones.
#
# pipeline.ecs_compatibility: v8
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config:
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
# config.reload.automatic: false
#
# How often to check if the pipeline configuration has changed (in seconds)
# Note that the unit value (s) is required. Values without a qualifier (e.g. 60) 
# are treated as nanoseconds.
# Setting the interval this way is not recommended and might change in later versions.
#
# config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false
#
# ------------ API Settings -------------
# Define settings related to the HTTP API here.
#
# The HTTP API is enabled by default. It can be disabled, but features that rely
# on it will not work as intended.
#
# api.enabled: true
#
# By default, the HTTP API is not secured and is therefore bound to only the
# host's loopback interface, ensuring that it is not accessible to the rest of
# the network.
# When secured with SSL and Basic Auth, the API is bound to _all_ interfaces
# unless configured otherwise.
#
# api.http.host: 127.0.0.1
#
# The HTTP API web server will listen on an available port from the given range.
# Values can be specified as a single port (e.g., `9600`), or an inclusive range
# of ports (e.g., `9600-9700`).
#
# api.http.port: 9600-9700
#
# The HTTP API includes a customizable "environment" value in its response,
# which can be configured here.
#
# api.environment: "production"
#
# The HTTP API can be secured with SSL (TLS). To do so, you will need to provide
# the path to a password-protected keystore in p12 or jks format, along with credentials.
#
# api.ssl.enabled: false
# api.ssl.keystore.path: /path/to/keystore.jks
# api.ssl.keystore.password: "y0uRp4$$w0rD"
#
# The HTTP API can be configured to require authentication. Acceptable values are
#  - `none`:  no auth is required (default)
#  - `basic`: clients must authenticate with HTTP Basic auth, as configured
#             with `api.auth.basic.*` options below
# api.auth.type: none
#
# When configured with `api.auth.type` `basic`, you must provide the credentials
# that requests will be validated against. Usage of Environment or Keystore
# variable replacements is encouraged (such as the value `"${HTTP_PASS}"`, which
# resolves to the value stored in the keystore's `HTTP_PASS` variable if present
# or the same variable from the environment)
#
# api.auth.basic.username: "logstash-user"
# api.auth.basic.password: "s3cUreP4$$w0rD"
#
# When setting `api.auth.basic.password`, the password should meet
# the default password policy requirements.
# The default password policy requires non-empty minimum 8 char string that
# includes a digit, upper case letter and lower case letter.
# Policy mode sets Logstash to WARN or ERROR when HTTP authentication password doesn't
# meet the password policy requirements.
# The default is WARN. Setting to ERROR enforces stronger passwords (recommended).
#
# api.auth.basic.password_policy.mode: WARN
#
# ------------ Module Settings ---------------
# Define modules here.  Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
#
# modules:
#   - name: MODULE_NAME
#     var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
#     var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
#
# queue.type: memory
#
# If `queue.type: persisted`, the directory path where the pipeline data files will be stored.
# Each pipeline will group its PQ files in a subdirectory matching its `pipeline.id`.
# Default is path.data/queue.
#
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#
# queue.max_bytes: 1024mb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ
# have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files
# may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and
# being available to be read by the dead_letter_queue input when items are written infrequently.
# Default is 5000.
#
# dead_letter_queue.flush_interval: 5000

# If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit.
# Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit.
# Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones.
#
# dead_letter_queue.storage_policy: drop_newer

# If using dead_letter_queue.enable: true, the interval that events have to be considered valid. After the interval has
# expired the events could be automatically deleted from the DLQ.
# The interval could be expressed in days, hours, minutes or seconds, using as postfix notation like 5d,
# to represent a five days interval.
# The available units are respectively d, h, m, s for day, hours, minutes and seconds.
# If not specified then the DLQ doesn't use any age policy for cleaning events.
#
# dead_letter_queue.retain.age: 1d

# If using dead_letter_queue.enable: true, defines the action to take when the dead_letter_queue.max_bytes is reached,
# could be "drop_newer" or "drop_older".
# With drop_newer, messages that were inserted most recently are dropped, logging an error line.
# With drop_older setting, the oldest messages are dropped as new ones are inserted.
# Default value is "drop_newer".
# dead_letter_queue.storage_policy: drop_newer

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
#
# log.level: info
# path.logs:
#
# ------------ Other Settings --------------
#
# Allow or block running Logstash as superuser (default: true)
# allow_superuser: false
#
# Where to find custom plugins
# path.plugins: []
#
# Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name
# Default is false
# pipeline.separate_logs: false
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
#xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: password
#xpack.monitoring.elasticsearch.proxy: ["http://proxy:port"]
#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.monitoring.elasticsearch.api_key: "id:api_key"
#xpack.monitoring.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"
#xpack.monitoring.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx
#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file
#xpack.monitoring.elasticsearch.ssl.truststore.password: password
#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.monitoring.elasticsearch.ssl.keystore.password: password
#xpack.monitoring.elasticsearch.ssl.verification_mode: certificate
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
#xpack.management.enabled: false
#xpack.management.pipeline.id: ["main", "apache_logs"]
#xpack.management.elasticsearch.username: logstash_admin_user
#xpack.management.elasticsearch.password: password
#xpack.management.elasticsearch.proxy: ["http://proxy:port"]
#xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.management.elasticsearch.cloud_id: management_cluster_id:xxxxxxxxxx
#xpack.management.elasticsearch.cloud_auth: logstash_admin_user:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.management.elasticsearch.api_key: "id:api_key"
#xpack.management.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx
#xpack.management.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"
#xpack.management.elasticsearch.ssl.truststore.path: /path/to/file
#xpack.management.elasticsearch.ssl.truststore.password: password
#xpack.management.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.management.elasticsearch.ssl.keystore.password: password
#xpack.management.elasticsearch.ssl.verification_mode: certificate
#xpack.management.elasticsearch.sniffing: false
#xpack.management.logstash.poll_interval: 5s

# X-Pack GeoIP plugin
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-manage_update
#xpack.geoip.download.endpoint: "https://geoip.elastic.co/v1/database"

Please do not post pictures of text. They are hard to read and impossible to search.

Please find the error below.

[2022-11-02T12:00:36,564][WARN ][logstash.outputs.webhdfs ][main] webhdfs write caused an exception: {"RemoteException":{"exception":"IOException","javaClassName":"java.io.IOException","message":
"All datanodes [DatanodeInfoWithStorage[127.0.0.1:9866,DS-a9c68d8e-dfaa-4eda-b023-27d647d33fc6,DISK]] are bad. Aborting..."}}
. Maybe you should increase retry_interval or reduce number of workers. Retrying...
[2022-11-02T12:00:36,590][WARN ][logstash.outputs.webhdfs ][main] webhdfs write caused an exception:
 {"RemoteException":{"exception":"AlreadyBeingCreatedException","javaClassName":"org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException",
 "message":"Failed to APPEND_FILE /test/test123.csv for DFSClient_NONMAPREDUCE_420551275_76 on 127.0.0.1 because this file lease is currently owned 
 by DFSClient_NONMAPREDUCE_-72586279_72 on 127.0.0.1\r\n\tat org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2706)\
 r\n\tat org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124)\r\n\tat
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2741)\r\n\tat org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:835)\r\n\tat 
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:503)\r\n\tat 
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)\r\n\tat 
 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)\r\n\tat org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)\r\n\tat
 org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)\r\n\tat org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)\r\n\tat java.security.AccessController.doPrivileged(Native Method)\r\n\tat javax.security.auth.Subject.doAs(Subject.java:422)\r\n\tat 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)\r\n\tat org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)\r\n"}}. 
 Maybe you should increase retry_interval or reduce number of workers. Retrying...

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.