Logstash 2.4 -- Check if two [tags] is present then ouput

Hello, Would like to have information

I am having a little difficulty with my logstash config, I would like to be able to "filter" out IF traffic come and goes from internal network, then output it to a specific index file IF traffic is coming from inside ( it will always be coming from internaly .. ) and going outside, and vice versa, then output to another index file

BUT I dont want em to "overlap"

Heres an example of the config im running ( ill strip out the non important part )

03-syslog.conf

filter {
if [type] == "syslog" {
if "devname" in [message] {
mutate {
add_tag => [ "COUPEFEU", "FORTIGATE" ]
}
}

...........................................
if "FORTIGATE" in [tags] {
grok {
match => [ "message", "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_host} %{GREEDYDATA:kv}" ]
remove_field => ["message"]
remove_field => ["syslog_timestamp"]

}
syslog_pri { }

kv {
source => "kv"
exclude_keys => [ "type", "subtype" ]
field_split => " "
value_split => "="
}

date {
match => [ "logtimestamp", "ISO8601" ]
locale => "en"
timezone =>"America/Montreal"
remove_field => [ "logtimestamp" ]
}

mutate {
convert => [ "rcvdbyte", "integer" ]
convert => [ "countdlp", "integer" ]
convert => [ "countweb", "integer" ]
convert => [ "countav", "integer" ]
convert => [ "countemail", "integer" ]
convert => [ "countips", "integer" ]
convert => [ "duration", "integer" ]
convert => [ "sentpkt", "integer" ]
convert => [ "rcvdpkt", "integer" ]
convert => [ "sentbyte", "integer" ]
convert => [ "shaperdroprcvdbyte", "integer" ]
convert => [ "shaperdropsentbyte", "integer" ]
convert => [ "filesize", "integer" ]
convert => [ "count", "integer" ]
convert => [ "total", "integer" ]
convert => [ "totalsession", "integer" ]
convert => [ "bandwidth", "integer" ]
#rename => { "type" => "ftg-type" }
}

#Geolocate logs that have SourceAddress and if that SourceAddress is a non-RFC1918 address or APIPA address
if [srcip] and [srcip] !~ "(^127.0.0.1)|(^10.)|(^172.1[6-9].)|(^172.2[0-9].)|(^172.3[0-1].)|(^192.168.)|(^169.254.)" {
geoip {
database => "/etc/logstash/GeoLiteCity.dat"
source => "srcip"
target => "SourceGeo"
add_tag => [ "traffic-wan" ]
}
}

#filtrer le traffic RITM du traffic internet pour input dans un second index
if [srcip] and [srcip] =~ "(^127.0.0.1)|(^10.)|(^172.1[6-9].)|(^172.2[0-9].)|(^172.3[0-1].)|(^192.168.)|(^169.254.)" {
mutate {
add_tag => [ "src-traffic-ritm" ]
}
}

#Geolocate logs that have DestinationAddress and if that DestinationAddress is a non-RFC1918 address or APIPA address
if [dstip] and [dstip] !~ "(^127.0.0.1)|(^10.)|(^172.1[6-9].)|(^172.2[0-9].)|(^172.3[0-1].)|(^192.168.)|(^169.254.)" {
geoip {
database => "/etc/logstash/GeoLiteCity.dat"
source => "dstip"
target => "DestinationGeo"
add_tag => [ "traffic-wan" ]
}
}

#filtrer le traffic RITM du traffic internet pour input dans un second index
if [dstip] and [dstip] =~ "(^127.0.0.1)|(^10.)|(^172.1[6-9].)|(^172.2[0-9].)|(^172.3[0-1].)|(^192.168.)|(^169.254.)" {
mutate {
add_tag => [ "dst-traffic-ritm" ]
}
}
}

output.conf

output {
if "traffic-wan" in [tags] {
elasticsearch {
hosts => ["es-client:9200"]
index => "logstash-fortigate-%{+YYYY.MM.dd}"
}
}

if "dst-traffic-ritm" in [tags] {
elasticsearch {
hosts => ["es-client:9200"]
index => "logstash-fortigate-ritm%{+YYYY.MM.dd}"
}
}

I would like to check if "dst-traffic-ritm" and "src-traffic-ritm" is present in [tags] then output to logstash-fortigate-ritm* if not, output to logstash-fortigate*

Please a little bit of help would be "once again" apreciated, im sure its something awefully stupid .. but i dont get it

if "dst-traffic-ritm" in [tags] and "src-traffic-ritm" in [tags] {
  elasticsearch {
    hosts => ["es-client:9200"]
    index => "logstash-fortigate-ritm%{+YYYY.MM.dd}"
  }
} else {
  elasticsearch {
    hosts => ["es-client:9200"]
    index => "logstash-fortigate-%{+YYYY.MM.dd}"
  }
}
1 Like

heres the complete output file :

output {
   if "dst-traffic-ritm" in [tags] and "src-traffic-ritm" in [tags] {
     elasticsearch {
         hosts => ["es-client:9200"]
         index => "logstash-fortigate-ritm%{+YYYY.MM.dd}"
          }
    else

     elasticsearch {
         hosts => ["es-client:9200"]
         index => "logstash-fortigate-%{+YYYY.MM.dd}"
          }
    }

     if [type] == "utm" {
     elasticsearch {
         hosts => ["es-client:9200"]
         template => "/template/es-ftg.json"
         template_overwrite => true
         index => "ftg-%{+YYYY.MM.dd}"
          }
    }
   if [type] == "traffic" {
     elasticsearch {
         hosts => ["es-client:9200"]
         template => "/template/es-ftg.json"
         template_overwrite => true
         index => "ftg-%{+YYYY.MM.dd}"
          }
    }
   if "eventlog" in [tags] {
    elasticsearch {
      index => "logstash-win-%{+YYYY.MM.dd}"
      hosts => ["es-client:9200"]
    }
  }
   if "apache" in [tags] {
    elasticsearch {
      index => "logstash-apache-%{+YYYY.MM.dd}"
      hosts => ["es-client:9200"]
      template => "/etc/logstash/elastic-apache-template.json"
      template_name => "apache"
      template_overwrite => true
    }
  }
   if "ossec" in [tags] {
    elasticsearch {
         hosts => ["es-client:9200"]
         index => "ossec-%{+YYYY.MM.dd}"
         document_type => "ossec"
         template => "/etc/logstash/elastic-ossec-template.json"
         template_name => "ossec"
         template_overwrite => true
    }
  }
   if "ASA" in [tags] {
    elasticsearch {
         hosts => ["es-client:9200"]
         index => "cisco-fw-%{+YYYY.MM.dd}"
    }
  }
   if [type] == "ossint" {
    elasticsearch {
         hosts => ["es-client:9200"]
         index => "ossint-%{+YYYY.MM.dd}"
    }
  }
   if "honeypotlog" in [tags] {
    elasticsearch {
         hosts => ["es-client:9200"]
         index => "logstash-honey-%{+YYYY.MM.dd}"
          }
    }
   if "hp-printers" in [tags] {
    elasticsearch {
      hosts => ["es-client:9200"]
      index => "hp_syslog-%{+YYYY.MM.dd}"
      document_type => "hp_device"
  }
}
#else if "alerts" in [tags] {
#     http {
#       http_method => "post"
#       format => "json"
#       mapping => ["environment", "Production", "event", "%{event}", "resource", "%{resource}", "text", "%{text}", "status", "open", "severity", "%{Severity}", "value", "%{value}"]
#       url => "http://capricorn:8080/api/alert"
#     }
#   }
   if "suricata" in [tags] {
    elasticsearch {
         hosts => ["es-client:9200"]
         manage_template => false
         index => "logstash-suricata-%{+YYYY.MM.dd}"
      }
   }
   if "metric" in [tags] {
    graphite {
            host => "health"
            metrics_format => "logstash.*"
            include_metrics => [ "events.*" ]
            fields_are_metrics => true
            reconnect_interval => 600
        }
    }
   if [type] == "traffic" {
     elasticsearch {
         hosts => ["es-client:9200"]
         index => "pan-traffic-%{+YYYY.MM.dd}"
          }
    }
   else if [type] == "url" {
       elasticsearch {
         hosts => ["es-client:9200"]
         index => "pan-url-%{+YYYY.MM.dd}"
          }
      }
}

It generate the following error :
The given configuration is invalid. Reason: Expected one of #, if, { at line 866, column 6 (byte 26860) after output {

if "dst-traffic-ritm" in [tags] and "src-traffic-ritm" in [tags] {
elasticsearch {
hosts => ["es-client:9200"]
index => "logstash-fortigate-ritm%{+YYYY.MM.dd}"
}
} else

{:level=>:fatal}

There are braces missing around the first else.

just tried, same result. oddly

output {
   if "dst-traffic-ritm" in [tags] and "src-traffic-ritm" in [tags] {
     elasticsearch {
         hosts => ["es-client:9200"]
         index => "logstash-fortigate-ritm%{+YYYY.MM.dd}"
          }
    }    else

     elasticsearch {
         hosts => ["es-client:9200"]
         index => "logstash-fortigate-%{+YYYY.MM.dd}"
          }
    }
1 Like

There must be an opening brace after else.

@magnusbaeck

want my filter to be as follow :

If src and dst is local traffic then output to fortigate-ritm-*
if src is local traffic and dst is wan traffic then output to fortigate-*
if src is wan traffic and dst is local then output to fortigate-*

But right now, I cant succed in doing it ... fortigate-ritm-* is working accordingly to plan BUT it act as a duplicate "stripped" index of fortigate-* and this is not what i Want \ need. fortigate-* is currentely keeping [tags] src and dst-traffic-ritm AND traffic-wan... then i only want traffic-wan

im not sure if I explain correctely my situation so heres the "new config" files ...

only the section matching ip adress

#Geolocate logs that have SourceAddress and if that SourceAddress is a non-RFC1918 address or APIPA address
if [srcip] and [srcip] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
    geoip {
         database => "/etc/logstash/GeoLiteCity.dat"
         source => "srcip"
         target => "SourceGeo"
         add_tag => [ "traffic-wan" ]
    }
}

#filtrer le traffic RITM du traffic internet pour input dans un second index
if [srcip] and [srcip] =~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
    mutate {
         add_tag => [ "src-traffic-ritm" ]
    }

    #Delete 0,0 in SourceGeo.location if equal to 0,0
    #if ([srcip.location] and [srcip.location] =~ "0,0") {
      #mutate {
        #["SourceGeo.location"] => "geo_point"
      #}
    #}
  }

#Geolocate logs that have DestinationAddress and if that DestinationAddress is a non-RFC1918 address or APIPA address
if [dstip] and [dstip] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
    geoip {
         database => "/etc/logstash/GeoLiteCity.dat"
         source => "dstip"
         target => "DestinationGeo"
         add_tag => [ "traffic-wan" ]
    }
}

#filtrer le traffic RITM du traffic internet pour input dans un second index
if [dstip] and [dstip] =~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
    mutate {
         add_tag => [ "dst-traffic-ritm" ]
    }

    #Delete 0,0 in DestinationGeo.location if equal to 0,0
    #if ([dstip.location] and [dstip.location] =~ "0,0") {
      #mutate {
        #["DestinationGeo.location"] => "geo_point"
         #}
       #}
     }
  }

and the ouput :

output {
   if "FORTIGATE" in [tags] and "dst-traffic-ritm" in [tags] and "src-traffic-ritm" in [tags] {
     elasticsearch {
         hosts => ["es-client:9200"]
#         template => "/template/es-fortigate.json"
#         template_overwrite => true
         index => "logstash-fortigate-ritm-%{+YYYY.MM.dd}"
          }
}
    else if "traffic-wan" in [tags] {

     elasticsearch {
         hosts => ["es-client:9200"]
#         template => "/template/es-fortigate.json"
#         template_overwrite => true
         index => "logstash-fortigate-%{+YYYY.MM.dd}"
          }
    }

Im positive somebody here will know what I need precisely ... cause right now, im having a hard time lol ...

By the way the goal is to be able to keep different "history" retention between "local" traffic ( used mainly for diagnostic ) so will be kept about a week, then be deleted. AND WAN traffic, that will need to be kept up to a full year long...

Thank you, again!

fortigate-ritm-* is working accordingly to plan BUT it act as a duplicate "stripped" index of fortigate-* and this is not what i Want \ need. fortigate-* is currentely keeping [tags] src and dst-traffic-ritm AND traffic-wan... then i only want traffic-wan

Please give an example of an event that has been put in the wrong index. Copy/paste from the JSON tab in Kibana's Discover panel. No screenshot please.

@magnusbaeck

sorry for the delay, we dont have the same working hour apprentely. I was out of the office.

here you go :

index logstash-fortigate-* ( should only have WAN traffic )

{
  "_index": "logstash-fortigate-ritm-2017.03.21",
  "_type": "syslog",
  "_id": "AVrxHo1oGwn1yOJq3cIy",
  "_score": null,
  "_source": {
    "@version": "1",
    "@timestamp": "2017-03-21T13:46:35.482Z",
    "host": "10.42.190.77",
    "port": 46600,
    "type": "syslog",
    "tags": [
      "main_syslog",
      "COUPEFEU",
      "FORTIGATE",
      "src-traffic-ritm",
      "dst-traffic-ritm",
      "_grokparsefailure"
    ],
    "syslog5424_pri": "189",
    "syslog_host": "10.26.196.251",
    "kv": "date=2017-03-21 time=10:04:55 devname=FG200B-MR devid=FG200B3910607673 logid=0001000014 type=traffic subtype=local level=notice vd=root srcip=10.216.97.240 srcport=137 srcintf=\"port1\" dstip=10.216.99.255 dstport=137 dstintf=unknown-0 sessionid=1596434600 proto=17 action=deny policyid=0 dstcountry=\"Reserved\" srccountry=\"Reserved\" trandisp=noop service=\"udp/137\" app=\"netbios forward\" duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat=\"unscanned\"",
    "syslog_severity_code": 5,
    "syslog_facility_code": 1,
    "syslog_facility": "user-level",
    "syslog_severity": "notice",
    "date": "2017-03-21",
    "time": "10:04:55",
    "devname": "FG200B-MR",
    "devid": "FG200B3914607673",
    "logid": "0001000014",
    "level": "notice",
    "vd": "root",
    "srcip": "10.216.197.240",
    "srcport": "137",
    "srcintf": "port1",
    "dstip": "10.216.199.255",
    "dstport": "137",
    "dstintf": "unknown-0",
    "sessionid": "1596434600",
    "proto": "17",
    "action": "deny",
    "policyid": "0",
    "dstcountry": "Reserved",
    "srccountry": "Reserved",
    "trandisp": "noop",
    "service": "udp/137",
    "app": "netbios forward",
    "duration": 0,
    "sentbyte": 0,
    "rcvdbyte": 0,
    "sentpkt": 0,
    "appcat": "unscanned",
    "fingerprint": "a75819276990371bd94f8c858f28fc499494a712"
  },
  "fields": {
    "date": [
      1490054400000
    ],
    "bandwidth_usage": [
      0
    ],
    "@timestamp": [
      1490103995482
    ]
  },
  "highlight": {
    "tags": [
      "@kibana-highlighted-field@src@/kibana-highlighted-field@-@kibana-highlighted-field@traffic@/kibana-highlighted-field@-@kibana-highlighted-field@ritm@/kibana-highlighted-field@",
      "@kibana-highlighted-field@dst@/kibana-highlighted-field@-@kibana-highlighted-field@traffic@/kibana-highlighted-field@-@kibana-highlighted-field@ritm@/kibana-highlighted-field@"
    ]
  },
  "sort": [
    1490103995482
  ]
}

( ive modified a few field only for privacy purpose so dont worry if kv and field dont match ... )

Thank you

So the event has both the src-traffic-ritm tag and the dst-traffic-ritm tag and it has been sent to the logstash-fortigate-ritm-2017.03.21 index. That's consistent with your configuration. Has the event been incorrectly tagged?

ahhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh
I think i just figured out why its acting as duplicata...

lemme guess...

logstash-fortigate-* equal also logstash-fortigate-ritm*

right ?

Yes, logstash-fortigate-* matches logstash-fortigate-ritm*.

that explain everything ..

So, in fact theres no "real duplicata" its just the way "discover" show it.. but in reality my index are working correctely ...

thats cool.

any input for my grokparsefailure tag ?

any input for my grokparsefailure tag ?

It looks like your grok filter is working. Do you have more than one grok filter (grep grok /etc/logstash/conf.d/*).

I have many, but not for the fortigate tag, I have only one.

ive read somewhere ( cant remember where.. ) that the reason we get grokparsefailure is cause of syslog plugin, that already do a grok for us ..

remember im running logstash 2.4

ive read somewhere ( cant remember where.. ) that the reason we get grokparsefailure is cause of syslog plugin, that already do a grok for us ..

Could be, but IIRC the syslog input has been adding its own tag when its grokking fails.

ok, so ... i dont go any further in research then, since it doesnt seem to cause me any trouble.

Quick, last question ...

Can you explain me how it is possible with two server 64gb ram, 8 cpu 3ghz I cannot go past 12hours of data without getting the dreaded timeout on my dashboard??

3 Shards, 1 replica and about 71gb daily index size...

How big is the ES heap?

31.94 gb each server

used 21.76gb for one server and 3.87gb for the second. Ive never seen it run out of heap so far.

Don't know what's up. Open a separate thread for that. Probably not in the Logstash group.