Hi there,
I'm using logstash to filter netflow data to save to elasticsearch, it works fine but I also want to filter data in elasticsearch to save to redis, so I wrote a file to do logstash pipeline.
Here is my file
input {
# elasticsearch
elasticsearch {
id => "input_elasticsearch_netflow_redis"
host => "${ELASTIFLOW_NETFLOW_IPV4_HOST}"
port => "${port}"
index => "elastiflow-3.5.3-2020.06.16"
user => "${user}"
password => "${pwd}"
query => { "default_field" : [
"netflow.first_switched",
"netflow.last_switched",
"flow.packets",
"flow.bytes",
"flow.src_addr",
"flow.dst_addr",
"flow.src_port",
"flow.dst_prot",
"flow.tcp_flags",
"flow.ip_protocol",
"flow.tos",
"flow.src_autonomous_system",
"flow.input_snmp",
"flow.output_snmp",
"netflow.src_as",
"netflow.dst_as"
]
}
}
}
filter {
mutate {
# split tcp flags
add_field => {
"has_fin" => "false"
"has_syn" => "false"
"has_rst" => "false"
"has_pst" => "false"
"has_ack" => "false"
"has_urg" => "false"
"has_cwe" => "flase"
"has_ece" => "false"
}
split => ["[flow][tcp_flags]", ","]
strip => ["[flow][tcp_flags]"]
if "FIN" in [flow][tcp_flags] {
mutate { replace => ["has_fin", "true"] }
}
if "SYN" in [flow][tcp_flags] {
mutate { replace => ["has_syn", "true"] }
}
if "RST" in [flow][tcp_flags] {
mutate { replace => ["has_rst", "true"] }
}
if "PUSH" in [flow][tcp_flags] {
mutate { replace => ["has_pst", "true"] }
}
if "ACK" in [flow][tcp_flags] {
mutate { replace => ["has_ack", "true"] }
}
if "URG" in [flow][tcp_flags] {
mutate { replace => ["has_urg", "true"] }
}
if "CWE" in [flow][tcp_flags] {
mutate { replace => ["has_cwe", "true"] }
}
if "ECE" in [flow][tcp_flags] {
mutate { replace => ["has_ece", "true"] }
}
}
}
output {
redis {
data_type => "list"
host => "${host}"
password => "${pwd}"
port => ${port}
# db default is 0
db => 0
#key is table name
key => "elastiflow-%{YYYY.MM.dd}"
#Interval for reconnecting to failed Redis connections, default is 1
reconnect_interval => 5
#Shuffle the host list during Logstash startup, default is true
shuffle_hosts => "true"
#Redis initial connection timeout in seconds, default is 5
timeout => 5
batch => true
batch_events => 50
batch_timeout => 5
}
}
after I add this file, I restarted logstash
systemctl daemon-reload
systemctl restart logstash
tail -f /var/log/logstash/logstash-plain.log
and log output these:
[2020-06-22T14:05:45,836][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:elastiflow, :exception=>"Java::OrgLogstashSecretStore::SecretStoreException::LoadException", :message=>"Found a file at /etc/logstash/logstash.keystore, but it is not a valid Logstash keystore.", :backtrace=>["org.logstash.secret.store.backend.JavaKeyStore.load(JavaKeyStore.java:288)", "org.logstash.secret.store.backend.JavaKeyStore.load(JavaKeyStore.java:60)", "org.logstash.secret.store.SecretStoreFactory.doIt(SecretStoreFactory.java:129)", "org.logstash.secret.store.SecretStoreFactory.load(SecretStoreFactory.java:115)", "org.logstash.secret.store.SecretStoreExt.getIfExists(SecretStoreExt.java:57)", "org.logstash.execution.AbstractPipelineExt.getSecretStore(AbstractPipelineExt.java:437)", "org.logstash.execution.JavaBasePipelineExt.initialize(JavaBasePipelineExt.java:80)", "org.logstash.execution.JavaBasePipelineExt$INVOKER$i$1$0$initialize.call(JavaBasePipelineExt$INVOKER$i$1$0$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:837)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1169)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuperSplatArgs(IRRuntimeHelpers.java:1156)", "org.jruby.ir.targets.InstanceSuperInvokeSite.invoke(InstanceSuperInvokeSite.java:39)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$initialize$0(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:43)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:82)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:332)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:86)", "org.jruby.RubyClass.newInstance(RubyClass.java:939)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0(/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0$__VARARGS__(/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:82)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)", "usr.share.logstash.logstash_minus_core.lib.logstash.agent.RUBY$block$converge_state$2(/usr/share/logstash/logstash-core/lib/logstash/agent.rb:342)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)", "org.jruby.runtime.Block.call(Block.java:139)", "org.jruby.RubyProc.call(RubyProc.java:318)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)", "java.base/java.lang.Thread.run(Thread.java:834)"]}
[2020-06-22T14:05:45,856][ERROR][logstash.agent ] An exception happened when converging configuration {:exception=>LogStash::Error, :message=>"Don't know how to handle `Java::OrgLogstashSecretStore::SecretStoreException::LoadException` for `PipelineAction::Create<elastiflow>`", :backtrace=>["org/logstash/execution/ConvergeResultExt.java:129:in `create'", "org/logstash/execution/ConvergeResultExt.java:57:in `add'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:355:in `block in converge_state'"]}
[2020-06-22T14:05:45,907][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<LogStash::Error: Don't know how to handle `Java::OrgLogstashSecretStore::SecretStoreException::LoadException` for `PipelineAction::Create<elastiflow>`>, :backtrace=>["org/logstash/execution/ConvergeResultExt.java:129:in `create'", "org/logstash/execution/ConvergeResultExt.java:57:in `add'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:355:in `block in converge_state'"]}
[2020-06-22T14:05:45,919][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
after these log error, no matter how many time I restarted, there wasn't write down any log, but in
journalctl -n -u logstash
has lots log come out continuously
Jun 23 14:56:10 dev-elastiflow logstash[100390]: [ERROR] 2020-06-23 14:56:10.375 [main] Logstash - java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
Jun 23 14:56:10 dev-elastiflow systemd[1]: logstash.service: Main process exited, code=exited, status=1/FAILURE
Jun 23 14:56:10 dev-elastiflow systemd[1]: logstash.service: Failed with result 'exit-code'.
Jun 23 14:56:10 dev-elastiflow systemd[1]: logstash.service: Service hold-off time over, scheduling restart.
Jun 23 14:56:10 dev-elastiflow systemd[1]: logstash.service: Scheduled restart job, restart counter is at 92.
Jun 23 14:56:10 dev-elastiflow systemd[1]: Stopped logstash.
Jun 23 14:56:10 dev-elastiflow systemd[1]: Started logstash.
Jun 23 14:56:10 dev-elastiflow logstash[100450]: OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Jun 23 14:56:12 dev-elastiflow logstash[100450]: WARNING: An illegal reflective access operation has occurred
Jun 23 14:56:12 dev-elastiflow logstash[100450]: WARNING: Illegal reflective access by com.headius.backport9.modules.Modules (file:/usr/share/logstash/logstash-core/lib/jars/jruby-complete-9.2.11.1.jar) to method sun.nio.ch.NativeThread.signal(long)
Jun 23 14:56:12 dev-elastiflow logstash[100450]: WARNING: Please consider reporting this to the maintainers of com.headius.backport9.modules.Modules
Jun 23 14:56:12 dev-elastiflow logstash[100450]: WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
Jun 23 14:56:12 dev-elastiflow logstash[100450]: WARNING: All illegal access operations will be denied in a future release
Jun 23 14:56:25 dev-elastiflow logstash[100450]: ERROR: Failed to load settings file from "path.settings". Aborting... path.setting=/etc/logstash, exception=Java::OrgLogstashSecretStore::SecretStoreException::LoadException, message=>Found a file at /etc/logstash/logstash.keystore, but it is not a valid Logstash keystore.
Jun 23 14:56:25 dev-elastiflow logstash[100450]: [ERROR] 2020-06-23 14:56:25.255 [main] Logstash - java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
Jun 23 14:56:25 dev-elastiflow systemd[1]: logstash.service: Main process exited, code=exited, status=1/FAILURE
Jun 23 14:56:25 dev-elastiflow systemd[1]: logstash.service: Failed with result 'exit-code'.
Jun 23 14:56:25 dev-elastiflow systemd[1]: logstash.service: Service hold-off time over, scheduling restart.
Jun 23 14:56:25 dev-elastiflow systemd[1]: logstash.service: Scheduled restart job, restart counter is at 93.
Jun 23 14:56:25 dev-elastiflow systemd[1]: Stopped logstash.
Jun 23 14:56:25 dev-elastiflow systemd[1]: Started logstash.
Jun 23 14:56:25 dev-elastiflow logstash[100510]: OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Jun 23 14:56:26 dev-elastiflow logstash[100510]: WARNING: An illegal reflective access operation has occurred
Jun 23 14:56:26 dev-elastiflow logstash[100510]: WARNING: Illegal reflective access by com.headius.backport9.modules.Modules (file:/usr/share/logstash/logstash-core/lib/jars/jruby-complete-9.2.11.1.jar) to method sun.nio.ch.NativeThread.signal(long)
Jun 23 14:56:26 dev-elastiflow logstash[100510]: WARNING: Please consider reporting this to the maintainers of com.headius.backport9.modules.Modules
Does anyone know how to solve these problems...?
Thanks.
Kase