Unable to parse my Fortigate Logs

Hi Team,

I am new to ELK. I have been trying to parse my Fortigate firewall logs with my my RegEx.

  1. I have Fortigate logs saved in my machine
  2. I have created a patterns directory in which I have my whole regex stuff
  3. I have my configuration file which has input which says the file should be read from so and so position and apply the filter grok which has the patterns directory location followed my match sequence
  4. I have the output part which says the output should be printed CLI

Configuration test has been passed. I unable to receive any output in my console. Please help me to resolve this.

Console Output :
**logstash@ubuntu:/usr/share/logstash$ sudo bin/logstash -f /etc/logstash/conf.d/logstash1.conf
[sudo] password for logstash:
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[WARN ] 2018-07-06 04:03:09.976 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2018-07-06 04:03:11.211 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.3.0"}
[INFO ] 2018-07-06 04:03:20.873 [Converge PipelineAction::Create] pipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>3, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[INFO ] 2018-07-06 04:03:22.157 [Converge PipelineAction::Create] pipeline - Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x1b7776cc run>"}
[INFO ] 2018-07-06 04:03:22.505 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[INFO ] 2018-07-06 04:03:23.214 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9601}

**

I have my configuration file which has input which says the file should be read from so and so position and apply the filter grok which has the patterns directory location followed my match sequence

Show your configuration, don't describe it. Details are everything!

Hi Magnus,

This is my configuration File :

input
{

file {
path => "/home/logstash/Downloads/attachments/Forti.log"
type => "Forti"

}
}

filter
{

if [type] == "Forti" 

   {

	  grok {

    		patterns_dir => ["/usr/share/logstash/patterns"]
			match => { "message" => '%{DATE:Date} %{TIME} %{TYPE:ftgtype} %{ACTION:UtmAction} %{SOURCEIP:SrcIP} %{SPORT:SrcPort} %{DIP:DstIP} %{DPORT:DstPort} %{SERVICE} %{DURATION} %{SBYTE:SentBytes} %{RBYTE:RcvdBytes} %{SESSIONID} %{SSID}' } 

		}

   }

}

output
{
if [type] == "Forti"
{

  #elasticsearch { hosts => ["localhost:9200"] }
  stdout{ codec => rubydebug }
}

}

Patterns File :

DATE (?<=date=)([0-9a-zA-Z]+[-]){2}[0-9]+
TIME ([0-9]+:){2}[0-9]+
TYPE (?<=\btype=)[a-zA-Z]*
ACTION (?<=action=)[a-zA-Z]*
SOURCEIP (?<=srcip=)(\d+.){3}\d+
SPORT (?<=srcport=)[\d]{5}
DIP (?<=dstip=)(\d+.){3}\d+
DPORT (?<=dstport=)[\d]{0,5}
SERVICE (?<=service=)[a-zA-Z]*
DURATION (?<=duration=)\d*
SBYTE (?<=sentbyte=)\d*
RBYTE (?<=rcvdbyte=)\d*
SESSIONID (?<=sessionid=)\d*
SSID (?<=shaperperipname=)[a-zA-Z-]+\s?

Please let me know what should be changed.

If you want new log files to be read from the start you need start_position => "beginning" in your file input. Additionally you need to understand how Logstash uses sincedb files to track the current position in the input files. See the file input documentation and read the countless past posts about this and how you can disable sincedb by setting the sincedb_path option.

hi @Kaviarasan,

Try this configuration file its completely running config file for fortigate firewall logs.

input {
udp {
port => 7000
type => "forti_log"
tags => ["location_a"]
}
}

filter {
#The Fortigate syslog contains a type field as well, we'll need to rename that field in order for this to work
if [type] == "forti_log" {

grok {
		match => ["message", "%{SYSLOG5424PRI:syslog_index}%{GREEDYDATA:message}"]
		overwrite => [ "message" ]
		tag_on_failure => [ "forti_grok_failure" ]
	}


    kv {
source => "message"
value_split => "="

#Expects you have csv enable set on your Fortigate. If not I think you'll have to change it to " " but I didn't test that.
field_split => ","
}

mutate {

#I want to use the timestamp inside the logs instead of Logstash's timestamp so we'll first create a new field containing the date and time fields from the syslog before we convert that to the @timestamp field
add_field => { "temp_time" => "%{date} %{time}" }
#The syslog contains a type field which messes with the Logstash type field so we have to rename it.
rename => { "type" => "ftg_type" }
rename => { "subtype" => "ftg_subtype" }
add_field => { "type" => "forti_log" }
convert => { "rcvdbyte" => "integer" }
convert => { "sentbyte" => "integer" }
}

date {
match => [ "temp_time", "yyyy-MM-dd HH:mm:ss" ]
timezone => "UTC"
target => "@timestamp"
}

mutate {

#add/remove fields as you see fit.
remove_field => ["syslog_index","syslog5424_pri","path","temp_time","service","date","time","sentpkt","rcvdpkt","log_id","message","poluuid"]
}
}
}

output {
stdout { codec => rubydebug }
if [type] == "forti_log" {
elasticsearch {
hosts => "localhost:9200"
http_compression => "true"
index => "forti-%{+YYYY.MM.dd}"
user => "elastic"
password => "elastic"
}
}
}

Change some field with your configuration let me know if its working or not.

Thanks & Regards,
Krunal.

Hi @magnusbaeck,

I have done the changes in the configuration file as below,

input
{

file {
path => "/home/logstash/Downloads/attachments/Forti.log"
start_position => "beginning"
sincedb_path => "/dev/null"
type => "Forti"

}
}

filter
{

if [type] == "Forti" 

   {

	  grok {

    		patterns_dir => ["/usr/share/logstash/patterns"]
			match => { "message" => '%{DATE:Date} %{TIME} %{TYPE:ftgtype} %{ACTION:UtmAction} %{SOURCEIP:SrcIP} %{SPORT:SrcPort} %{DIP:DstIP} %{DPORT:DstPort} %{SERVICE} %{DURATION} %{SBYTE:SentBytes} %{RBYTE:RcvdBytes} %{SESSIONID} %{SSID}' } 
		overwrite => [ "message" ]

		}

   }

}

output
{
if [type] == "Forti"
{

  #elasticsearch { hosts => ["localhost:9200"] }
  stdout{ codec => rubydebug }
}

}

The output is,

{
"tags" => [
[0] "_grokparsefailure"
],
"@version" => "1",
"@timestamp" => 2018-07-10T14:36:27.574Z,
"message" => "date=2018-06-29 time=15:58:32 itime=1530267143 logver=52 logid=13 type=traffic subtype=forward level=notice vd=root devid=FG3K2C3Z13800218 action=accept trandisp=snat srcip=172.22.66.87 srcport=51289 dstip=119.81.38.204 dstport=53 service=DNS proto=17 duration=60 policyid=259 sentbyte=146 rcvdbyte=332 sentpkt=1 rcvdpkt=1 srcintf=port14 dstintf=port18 sessionid=97365788 app=DNS appcat=Not.Scanned shaperperipname=Aruba-HP-Students transip=14.139.181.229 transport=51289 dstcountry=Singapore applist=Student srccountry=Reserved poluuid=5a671458-9a0d-51e7-5dd8-c1fe28080ec4",
"path" => "/home/logstash/Downloads/attachments/Forti.log",
"type" => "Forti",
"host" => "ubuntu"
}

I need the values which has been written in my grok as a separate values/index. What I am missing here ?

Your grok expression clearly doesn't match your data. In fact, don't use a grok filter here. It's a list of key/value pairs so a kv filter is more appropriate.

This is working fine :

input {
file {
type => "fortigate"
path => "/home/logstash/Downloads/attachments/Forti.log"
sincedb_path => "/dev/null"
start_position => "beginning"
}
}

filter {
if [type] == "fortigate" {

	grok {
		match => ["message", "%{SYSLOG5424PRI:syslog_index}%{GREEDYDATA:message}"]
		#overwrite => [ "message" ]
		tag_on_failure => [ "failure_grok_fortigate" ]
	}

    kv {

value_split => "="

}

mutate {

#I want to use the timestamp inside the logs instead of Logstash's timestamp so we'll first create a new field containing the date and time fields from the syslog before we convert that to the @timestamp field
add_field => { "temp_time" => "%{date} %{time}" }
#add_field => { "Desti_Country" => "%{dstip}" }
#The syslog contains a type field which messes with the Logstash type field so we have to rename it.
rename => { "type" => "ftg_type" }
#rename => { "ip" => "Desti_IP" }
rename => { "subtype" => "ftg_subtype" }
#add_field => { "type" => "forti_log" }
convert => { "rcvdbyte" => "integer" }
convert => { "sentbyte" => "integer" }
}

date {
match => [ "temp_time", "yyyy-MM-dd HH:mm:ss" ]
timezone => "UTC"
target => "@timestamp"
}

geoip {
source => "dstip"
add_field => [ "[geoip][desti_ip]", "%{[geoip][ip]}" ]
}

mutate {

#add/remove fields as you see fit.
remove_field => ["syslog_index","sessionid","dstcountry","dstip","transip","country_code3","region_code","country_code2","syslog5424_pri","transport","appcat","srccountry","dstintf","devid","@version","itime","path","logver","logid","vd","host","srcintf","trandisp","location","date","time","service","temp_time","tags","sentpkt","rcvdpkt","log_id","message","poluuid"]

remove_field => "[geoip][longitude]"
remove_field => "[geoip][region_code]"
remove_field => "[geoip][country_code3]"
remove_field => "[geoip][continent_code]"
remove_field => "[geoip][country_code2]"
remove_field => "[geoip][latitude]"
remove_field => "[geoip][location]"
remove_field => "[geoip][region_name]"
remove_field => "[geoip][ip]"
}

}
}

output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "localhost:9200"
#http_compression => "true"
index => "forti-%{+YYYY.MM.dd}"
}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.