I'm using Logstash to receive logs from Filebeat and push to ElasticSearch. Currently there are some errors in the log. But it seems that ElasticSearch is able to receive the data. However, as there is lots of data flowing, I have not found out a way to verify if there is some data loss in the process.
Below is the error message, it keeps coming.
[2018-07-11T16:43:58,748][INFO ][org.logstash.beats.BeatsHandler] [local: 10.176.15.1:5044, remote: 10.176.138.188:59226] Handling exception: failed to allocate 16777216 byte(s) of direct memory (used: 1022622823, max: 1037959168)
[2018-07-11T16:43:58,748][WARN ][io.netty.channel.DefaultChannelPipeline] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 1022622823, max: 1037959168)
at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.buffer.PoolArena.reallocate(PoolArena.java:397) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:118) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:285) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:265) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1079) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1072) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1062) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:353) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) [netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.18.Final.jar:4.1.18.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
I think my configuration is correct, I've attached the pipelines here.
- pipeline.id: qa_kafka
path.config: "/home/kafka/tools/logstash-6.2.4/qa_kafka.conf"
- pipeline.id: test_kafka
path.config: "/home/kafka/tools/logstash-6.2.4/test_kafka.conf"
- pipeline.id: test_zk
path.config: "/home/kafka/tools/logstash-6.2.4/test_zk.conf"
- pipeline.id: qa_zk
path.config: "/home/kafka/tools/logstash-6.2.4/qa_zk.conf"
All these pipeline are similar, so I've attached one of them here.
input {
beats {
port => "5045"
}
}
filter {
grok {
remove_field => ["message", "tags"]
match => {
"message" => "\[(?<log_time>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND},[0-9]{3})\] %{LOGLEVEL:log_type} \[?(?<component>(\[.*\])|(.*?[0-9])|(.*?))\]?(:|,)? ?%{GREEDYDATA:content}"
}
}
grok {
match => {
"source" => ".+/(?<log_file>.+\.log)"
}
}
date {
timezone => "America/Los_Angeles"
match => ["log_time", "yyyy-MM-dd HH:mm:ss,SSS"]
target => "@datetime"
}
ruby {
code => 'require "date"
current_time = DateTime.now
t = current_time.strftime("%d/%m/%Y %H:%M:%S,%L")
event.set("@collected_time", t)'
}
}
output {
# stdout { codec => rubydebug }
elasticsearch {
index => "kafka_log_test"
hosts => ["localhost:9200"]
}
}
Below is some information about the Java Heap Usage.
PID MAIN-CLASS HPCUR HPMAX NHCUR NHMAX CPU GC VM USERNAME #T DL
29089 gstash.Logstash 522m 989m 173m n/a 0.85% 0.00% O8U13 kafka 194
Really appreciate your help!