Parse firewall logs collected by logstash through syslog


#1

Hello,

I am using ELK (version 6.2.4), and I would like to collect firewall logs (Fortigate) from another SIEM, so I followed the following steps:

  • I configured the other SIEM to forward these logs to ELK via the UDP protocol: port 514 in payload format
  • I checked if the logs are received on the network interface with tcpdump, they are received
  • I created a conf file (syslog.conf) as follows:

input {
** udp {**
** port => 514**
** type => syslog**
** }**

filter {
** if [type] == "syslog" {**
** grok {**
** match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }**
** add_field => [ "received_at", "%{@timestamp}" ]**
** add_field => [ "received_from", "%{host}" ]**
** }**
** date {**
** match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]**
** }**
** }**
}

output {
** elasticsearch {**
** hosts => ["localhost:9200"]**
** user => "user"**
** password => "password"**
** index => "fortigate-%{+YYYY.MM.dd}"**
** }**
}

But the index is not created and i have the following error:

[logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:fortigate, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, => at line 8, column 6 (byte 71) after input {\n udp {\n port => 514\n type => syslog\n }\n\nfilter {\n if ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:42:in compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:50:incompile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:12:in block in compile_sources'", "org/jruby/RubyArray.java:2486:inmap'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in compile_sources'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:51:ininitialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:169:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:inexecute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:315:in block in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:inwith_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:312:in block in converge_state'", "org/jruby/RubyArray.java:1734:ineach'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:299:in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:166:inblock in converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:164:inconverge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:90:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:348:inblock in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}

  • I would like to know please how to parse these logs and be able to create the index, and if there are other ways to do it.
    For information, I already collect the netflow with elastiflow and windows logs with winlogbeat whose pipelines are different.

(Magnus Bäck) #2

You're not closing the input block with }.


#3

Thank you, I'm going to change that and try again.
I will keep you informed of the result.


#4

Hello,

Everything seems ok now but this time I have the following error:

[INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"firewall", :thread=>"#<Thread:0x59e8c2a8@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:247 sleep>"}
[INFO ][logstash.agent ] Pipelines running {:count=>4, :pipelines=>[".monitoring-logstash", "beats", "elastiflow", "firewall"]}
[INFO ][logstash.inputs.udp ] Starting UDP listener {:address=>"0.0.0.0:514"}
[WARN ][logstash.inputs.udp ] UDP listener died {:exception=>#<SocketError: bind: name or service not known>, :backtrace=>["org/jruby/ext/socket/RubyUDPSocket.java:200:in bind'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-udp-3.3.3/lib/logstash/inputs/udp.rb:102:inudp_listener'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-udp-3.3.3/lib/logstash/inputs/udp.rb:58:in run'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:514:ininputworker'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:507:in `block in start_input'"]}

Thank you for your help


(Magnus Bäck) #5

Unless you run Logstash as root it can't listen on port 514. You can however use iptables to redirect that port to a port that Logstash can listen on. Does it work if you pick another port (>1024)?


#6

Thank you very much, I actually redirected port 514 to port 5044, it worked and I was able to view the firewall logs on kibana. However, I would like to know please how to parse them with logstash, I receive them as a raw message (type, @timestamp, message)
I tried to parse them with "Grok debbuger" and I managed to do it with that:

time=%{TIME:time} devname=%{HOST:hostname} devid=%{HOST:devid} logid=%{NUMBER:logid} type=%{WORD:type} subtype=%{WORD:subtype} level=%{WORD:level} vd=%{WORD:vdom} srcip=%{IP:srcip} srcport=%{NUMBER:srcport} srcintf="%{HOST:srcintf}" dstip=%{IP:dstip} dstport=%{NUMBER:dstport} dstintf="%{WORD:dstintf}" sessionid=%{NUMBER:sessionid} proto=%{NUMBER:proto} action=%{WORD:action} policyid=%{DATA:policyid} dstcountry="%{WORD:dstcountry}" srccountry="%{WORD:srccountry}" trandisp=%{WORD:trandisp} service="%{WORD:service}" duration=%{NUMBER,duration} sentbyte=%{INT:sentbyte} rcvdbyte=%{INT:rcvdbyte} sentpkt=%{INT:sentpkt} rcvdpkt=%{INT:rcvdpkt} vpn="%{WORD:vpn}" vpntype=%{GREEDYDATA:vpntype} shapersentname="%{WORD:shapersentname}" shaperdropsentbyte=%{INT:shaperdropsentbyte}

thank you in advance


(Magnus Bäck) #7

Use a kv filter instead.


#8

I modified the filter section as follows:

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
kv {}

}
it could identify some fields and add them but it doesn't parse all the message


(Magnus Bäck) #9

What does a processed message look like then? Use a stdout { codec => rubydebug } output.


#10

Hello,

In fact, I have to explain a little more.
So, the structure of the logs that I receive is not identical everywhere.
I get the raw firewall logs of: PaloAlto and Fortigate
and I also get the WAF logs: F5 and Deny All.
and I created only one configuration file that I named "firewall.conf" to receive all that.
That's why there are messages that are partially parsed and others not with kv. I show below an extract of each one's logs to see what his structure looks like. What interests me most is the firewall logs

PaloAlto: the fields are separated by |
|usrName=|SourceUser=|DestinationUser=|Application=not-applicable|

Foritgate: the fields are separated by spaces
date=2018-06-04 time=12:49:09 devname=FG-B

thank you in advance


#11

I think I need to create a configuration file for each one, but I don't know how logstash will know that it's fortigate logs and not Paloalto ones knowing that they are all sent to port 5044, to parse them differently.
Do I have to specify the name of the device on the configuration file?


(Jonhnathan) #12

Hey dora, here palo alto logs are in csv format, and to specify on which port we are going to receive the palo alto logs, u can use the following:

input {
syslog {
timezone => "Brazil/East"
port => "5044"
type => "syslog"
}
}

for the fortigate, ossec has built-in decoders for them... i think wazuh could be useful on your enviroment.


(system) #13

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.