Hi
I am trying to connect a plugin on Elasticsearch with security enabled.
The plugin is having an unrecoverable error and is restarting
portion of logs of logstash
[2022-05-25T18:53:15,071][INFO ][logstash.inputs.tcp ][mikrotik-log][6941e412eeba6d7a7c03e32f344fbdf2555da759a46c27a7cb636d8cc746d53b] Starting tcp input listener {:address=>"0.0.0.0:5514", :ssl_enable=>false}
[2022-05-25T18:53:15,073][WARN ][io.netty.channel.AbstractChannel][mikrotik-log][6941e412eeba6d7a7c03e32f344fbdf2555da759a46c27a7cb636d8cc746d53b] Force-closing a channel whose registration task was not accepted by an event loop: [id: 0x7881ae58]
java.util.concurrent.RejectedExecutionException: event executor terminated
at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483) [netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87) [netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81) [netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86) [netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323) [netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.bootstrap.AbstractBootstrap.doBind(AbstractBootstrap.java:272) [netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.bootstrap.AbstractBootstrap.bind(AbstractBootstrap.java:268) [netty-all-4.1.65.Final.jar:4.1.65.Final]
at io.netty.bootstrap.AbstractBootstrap.bind(AbstractBootstrap.java:253) [netty-all-4.1.65.Final.jar:4.1.65.Final]
at org.logstash.tcp.InputLoop.run(InputLoop.java:86) [logstash-input-tcp-6.2.7.jar:?]
at jdk.internal.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:441) [jruby.jar:?]
at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:305) [jruby.jar:?]
at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32) [jruby.jar:?]
at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_tcp_minus_6_dot_2_dot_7_minus_java.lib.logstash.inputs.tcp.RUBY$method$run$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-tcp-6.2.7-java/lib/logstash/inputs/tcp.rb:160) [jruby.jar:?]
at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$inputworker$0(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:410) [jruby.jar:?]
at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$inputworker$0$VARARGS(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:405) [jruby.jar:?]
at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80) [jruby.jar:?]
at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70) [jruby.jar:?]
at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207) [jruby.jar:?]
at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_input$1(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:401) [jruby.jar:?]
at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138) [jruby.jar:?]
at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) [jruby.jar:?]
at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52) [jruby.jar:?]
at org.jruby.runtime.Block.call(Block.java:139) [jruby.jar:?]
at org.jruby.RubyProc.call(RubyProc.java:318) [jruby.jar:?]
at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105) [jruby.jar:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
[2022-05-25T18:53:15,078][ERROR][logstash.javapipeline ][mikrotik-log][6941e412eeba6d7a7c03e32f344fbdf2555da759a46c27a7cb636d8cc746d53b] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:mikrotik-log
Plugin: <LogStash::Inputs::Tcp port=>5514, tags=>["mikrotik-log"], id=>"6941e412eeba6d7a7c03e32f344fbdf2555da759a46c27a7cb636d8cc746d53b", enable_metric=>true, codec=><LogStash::Codecs::Line id=>"line_6bd18bfc-f160-4af7-b189-df277732e185", enable_metric=>true, charset=>"UTF-8", delimiter=>"\n">, host=>"0.0.0.0", mode=>"server", proxy_protocol=>false, ssl_enable=>false, ssl_verify=>true, ssl_key_passphrase=>, tcp_keep_alive=>false, dns_reverse_lookup_enabled=>true>
Error: event executor terminated
Exception: Java::JavaUtilConcurrent::RejectedExecutionException
Stack: io.netty.util.concurrent.SingleThreadEventExecutor.reject(io/netty/util/concurrent/SingleThreadEventExecutor.java:926)
io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(io/netty/util/concurrent/SingleThreadEventExecutor.java:353)
io.netty.util.concurrent.SingleThreadEventExecutor.addTask(io/netty/util/concurrent/SingleThreadEventExecutor.java:346)
io.netty.util.concurrent.SingleThreadEventExecutor.execute(io/netty/util/concurrent/SingleThreadEventExecutor.java:828)
io.netty.util.concurrent.SingleThreadEventExecutor.execute(io/netty/util/concurrent/SingleThreadEventExecutor.java:818)
io.netty.channel.AbstractChannel$AbstractUnsafe.register(io/netty/channel/AbstractChannel.java:483)
io.netty.channel.SingleThreadEventLoop.register(io/netty/channel/SingleThreadEventLoop.java:87)
io.netty.channel.SingleThreadEventLoop.register(io/netty/channel/SingleThreadEventLoop.java:81)
io.netty.channel.MultithreadEventLoopGroup.register(io/netty/channel/MultithreadEventLoopGroup.java:86)
io.netty.bootstrap.AbstractBootstrap.initAndRegister(io/netty/bootstrap/AbstractBootstrap.java:323)
io.netty.bootstrap.AbstractBootstrap.doBind(io/netty/bootstrap/AbstractBootstrap.java:272)
io.netty.bootstrap.AbstractBootstrap.bind(io/netty/bootstrap/AbstractBootstrap.java:268)
io.netty.bootstrap.AbstractBootstrap.bind(io/netty/bootstrap/AbstractBootstrap.java:253)
org.logstash.tcp.InputLoop.run(org/logstash/tcp/InputLoop.java:86)
jdk.internal.reflect.GeneratedMethodAccessor43.invoke(jdk/internal/reflect/GeneratedMethodAccessor43)
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566)
org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:441)
org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:305)
usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_tcp_minus_6_dot_2_dot_7_minus_java.lib.logstash.inputs.tcp.run(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-tcp-6.2.7-java/lib/logstash/inputs/tcp.rb:160)
usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.inputworker(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:410)
usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_input(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:401)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:318)
java.lang.Thread.run(java/lang/Thread.java:829)
this is my plugin :
root@elastic:/etc/logstash# cat /etc/logstash/conf.d/mikrotik-log.conf
Input will be the tcp port specified, mikrotik config will be shown later.
input {
tcp {
port => 5514
tags => ["mikrotik-log"]
}
udp {
port => 5514
tags => ["mikrotik-log"]
}
}
the tag mikrotik-log is added by the input
filter {
if "mikrotik-log" in [tags] {
grok {
id => "mikrotik-log-pipeline"
patterns_dir => "/etc/logstash/custom-patterns/"
tag_on_failure => "_grokparsefailure_mikrotik_log"
match => [
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:item} %{DATA:action} by %{DATA:user}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) user %{DATA:user} %{GREEDYDATA:action} from %{IP:host} via %{DATA:method}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} for user %{DATA:user} from %{IP:host} via %{DATA:method}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} for user: %{DATA:user}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} (Identity Protection): %{IP:local_address}%{GREEDYDATA}%{IP:remote_address}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action}%{IP:local_address}[%{GREEDYDATA}]-%{IP:remote_address}[%{GREEDYDATA}] spi:%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_ACQUIRED_IP:action} %{IP:acquired_ip} address for %{IP:remote_address}[%{GREEDYDATA}]$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action}%{IP:local_address}[%{GREEDYDATA}]<=>%{IP:remote_address}[%{GREEDYDATA}] spi=%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} %{IP:released_ip} $",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_FAILED_PROPOSAL:action}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{IP:remote_address} %{MIKROTIK_FAILED_PROPOSAL:action}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_PEER_NOT_COMPLIANT:action}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) the %{MIKROTIK_PACKET_RETRANSMISSION:action} by %{IP:remote_address}[%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:interface} link %{GREEDYDATA:link_state}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_TRAFFIC_FLOW:action} by %{DATA:user}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:address_pool} %{MIKROTIK_ACQUIRED_IP:action} %{IP:acquired_ip} to %{DATA:mac_address}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:address_pool} %{MIKROTIK_RELEASED_IP:action} %{IP:released_ip} from %{DATA:mac_address} $",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{DATA:action}, signal strength %{INT:signal_strength}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{DATA:action}, %{MIKROTIK_DISCO_REASON:disconnect_reason}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{MIKROTIK_WIFI_STATE:wifi_state} $",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:chain}: in:%{DATA:in_interface} out:%{GREEDYDATA:out_interface}, src-mac %{DATA:mac_address}, proto %{DATA:protocol}, %{IP:local_address}:%{INT:src_port}->%{IP:remote_address}:%{INT:dst_port}, len %{INT:length}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:chain}: in:%{DATA:in_interface} out:%{GREEDYDATA:out_interface}, src-mac %{DATA:mac_address}, proto %{DATA:protocol} (%GREEDYDATA}), %{IP:local_address}->%{IP:remote_address}, len %{INT:length}%{GREEDYDATA}$"
]
}
if "_grokparsefailure_mikrotik_log" not in [tags] {
mutate {
remove_field => ["message"]
}
}
}
}
output to all Elasticsearch hosts
output {
if "mikrotik-log" in [tags] {
Elasticsearch {
ssl => true
ssl_certificate_verification => false
user => "elastic"
password => "password"
id => "mikrotik-log-output"
hosts => ["https://192.168.15.38:9200"]
index => "mikrotik-log-%{+YYYY.MM.ww}"
}
}
}