Hello,
I am using ELK (version 6.2.4), and I would like to collect firewall logs (Fortigate) from another SIEM, so I followed the following steps:
- I configured the other SIEM to forward these logs to ELK via the UDP protocol: port 514 in payload format
- I checked if the logs are received on the network interface with tcpdump, they are received
- I created a conf file (syslog.conf) as follows:
input {
** udp {**
** port => 514**
** type => syslog**
** }**
filter {
** if [type] == "syslog" {**
** grok {**
** match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }**
** add_field => [ "received_at", "%{@timestamp}" ]**
** add_field => [ "received_from", "%{host}" ]**
** }**
** date {**
** match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]**
** }**
** }**
}
output {
** elasticsearch {**
** hosts => ["localhost:9200"]**
** user => "user"**
** password => "password"**
** index => "fortigate-%{+YYYY.MM.dd}"**
** }**
}
But the index is not created and i have the following error:
[logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:fortigate, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, => at line 8, column 6 (byte 71) after input {\n udp {\n port => 514\n type => syslog\n }\n\nfilter {\n if ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:42:in compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:50:in
compile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:12:in block in compile_sources'", "org/jruby/RubyArray.java:2486:in
map'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in compile_sources'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:51:in
initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:169:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:in
execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:315:in block in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in
with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:312:in block in converge_state'", "org/jruby/RubyArray.java:1734:in
each'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:299:in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:166:in
block in converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:164:in
converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:90:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:348:in
block in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}
- I would like to know please how to parse these logs and be able to create the index, and if there are other ways to do it.
For information, I already collect the netflow with elastiflow and windows logs with winlogbeat whose pipelines are different.