Parse firewall logs collected by logstash through syslog

Hello,

I am using ELK (version 6.2.4), and I would like to collect firewall logs (Fortigate) from another SIEM, so I followed the following steps:

  • I configured the other SIEM to forward these logs to ELK via the UDP protocol: port 514 in payload format
  • I checked if the logs are received on the network interface with tcpdump, they are received
  • I created a conf file (syslog.conf) as follows:

input {
** udp {**
** port => 514**
** type => syslog**
** }**

filter {
** if [type] == "syslog" {**
** grok {**
** match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }**
** add_field => [ "received_at", "%{@timestamp}" ]**
** add_field => [ "received_from", "%{host}" ]**
** }**
** date {**
** match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]**
** }**
** }**
}

output {
** elasticsearch {**
** hosts => ["localhost:9200"]**
** user => "user"**
** password => "password"**
** index => "fortigate-%{+YYYY.MM.dd}"**
** }**
}

But the index is not created and i have the following error:

[logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:fortigate, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, => at line 8, column 6 (byte 71) after input {\n udp {\n port => 514\n type => syslog\n }\n\nfilter {\n if ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:42:in compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:50:incompile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:12:in block in compile_sources'", "org/jruby/RubyArray.java:2486:inmap'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in compile_sources'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:51:ininitialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:169:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:inexecute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:315:in block in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:inwith_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:312:in block in converge_state'", "org/jruby/RubyArray.java:1734:ineach'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:299:in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:166:inblock in converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:164:inconverge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:90:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:348:inblock in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}

  • I would like to know please how to parse these logs and be able to create the index, and if there are other ways to do it.
    For information, I already collect the netflow with elastiflow and windows logs with winlogbeat whose pipelines are different.

You're not closing the input block with }.

Thank you, I'm going to change that and try again.
I will keep you informed of the result.

Hello,

Everything seems ok now but this time I have the following error:

[INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"firewall", :thread=>"#<Thread:0x59e8c2a8@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:247 sleep>"}
[INFO ][logstash.agent ] Pipelines running {:count=>4, :pipelines=>[".monitoring-logstash", "beats", "elastiflow", "firewall"]}
[INFO ][logstash.inputs.udp ] Starting UDP listener {:address=>"0.0.0.0:514"}
[WARN ][logstash.inputs.udp ] UDP listener died {:exception=>#<SocketError: bind: name or service not known>, :backtrace=>["org/jruby/ext/socket/RubyUDPSocket.java:200:in bind'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-udp-3.3.3/lib/logstash/inputs/udp.rb:102:inudp_listener'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-udp-3.3.3/lib/logstash/inputs/udp.rb:58:in run'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:514:ininputworker'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:507:in `block in start_input'"]}

Thank you for your help

Unless you run Logstash as root it can't listen on port 514. You can however use iptables to redirect that port to a port that Logstash can listen on. Does it work if you pick another port (>1024)?

Thank you very much, I actually redirected port 514 to port 5044, it worked and I was able to view the firewall logs on kibana. However, I would like to know please how to parse them with logstash, I receive them as a raw message (type, @timestamp, message)
I tried to parse them with "Grok debbuger" and I managed to do it with that:

time=%{TIME:time} devname=%{HOST:hostname} devid=%{HOST:devid} logid=%{NUMBER:logid} type=%{WORD:type} subtype=%{WORD:subtype} level=%{WORD:level} vd=%{WORD:vdom} srcip=%{IP:srcip} srcport=%{NUMBER:srcport} srcintf="%{HOST:srcintf}" dstip=%{IP:dstip} dstport=%{NUMBER:dstport} dstintf="%{WORD:dstintf}" sessionid=%{NUMBER:sessionid} proto=%{NUMBER:proto} action=%{WORD:action} policyid=%{DATA:policyid} dstcountry="%{WORD:dstcountry}" srccountry="%{WORD:srccountry}" trandisp=%{WORD:trandisp} service="%{WORD:service}" duration=%{NUMBER,duration} sentbyte=%{INT:sentbyte} rcvdbyte=%{INT:rcvdbyte} sentpkt=%{INT:sentpkt} rcvdpkt=%{INT:rcvdpkt} vpn="%{WORD:vpn}" vpntype=%{GREEDYDATA:vpntype} shapersentname="%{WORD:shapersentname}" shaperdropsentbyte=%{INT:shaperdropsentbyte}

thank you in advance

Use a kv filter instead.

I modified the filter section as follows:

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
kv {}

}
it could identify some fields and add them but it doesn't parse all the message

What does a processed message look like then? Use a stdout { codec => rubydebug } output.

Hello,

In fact, I have to explain a little more.
So, the structure of the logs that I receive is not identical everywhere.
I get the raw firewall logs of: PaloAlto and Fortigate
and I also get the WAF logs: F5 and Deny All.
and I created only one configuration file that I named "firewall.conf" to receive all that.
That's why there are messages that are partially parsed and others not with kv. I show below an extract of each one's logs to see what his structure looks like. What interests me most is the firewall logs

PaloAlto: the fields are separated by |
|usrName=|SourceUser=|DestinationUser=|Application=not-applicable|

Foritgate: the fields are separated by spaces
date=2018-06-04 time=12:49:09 devname=FG-B

thank you in advance

I think I need to create a configuration file for each one, but I don't know how logstash will know that it's fortigate logs and not Paloalto ones knowing that they are all sent to port 5044, to parse them differently.
Do I have to specify the name of the device on the configuration file?

Hey dora, here palo alto logs are in csv format, and to specify on which port we are going to receive the palo alto logs, u can use the following:

input {
syslog {
timezone => "Brazil/East"
port => "5044"
type => "syslog"
}
}

for the fortigate, ossec has built-in decoders for them... i think wazuh could be useful on your enviroment.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.