Logstash Email Output Plugin not using environmental variables

Hi All,
I'm very new to Elastic and Logstash and am having some issues outputting environmental variables to my email output plugin.

I have installed my stack through Selks (Suricata, Logstash, evebox etc etc) and all is configured correctly, I am just additionally setting up some external notifications to an email, then eventually through zapier to slack channel.

Below is the output I am getting to my email, as you can see the environmental variables under "_source" are working correctly, however are coming through as clear text for anything under alert. or geoip.

2024-02-06T00:38:05.952Z Test 1 ---- Test 3 1 ---- %{geoip.latitude} %{alert.action} %{flow.dest_port}

Below is the Alert, as seen in elastic

{
   _id: "WW3ae40BQ4suRNJyBmCZ",
   _ignored: [
      "packet.raw",
      "packet.keyword"
   ],
   _index: "logstash-alert-2024.02.06",
   _metadata: {
      aggregate: true,
      count: 1,
      escalated_count: 0,
      max_timestamp: "2024-02-06T00:38:21.955800+0000",
      min_timestamp: "2024-02-06T00:38:21.955800+0000"
   },
   _score: null,
   _source: {
      "@timestamp": "2024-02-06T00:38:21.955Z",
      "@version": "1",
      alert: {
         action: "allowed",
         category: "Potential Corporate Privacy Violation",
         gid: 1,
         metadata: {
            created_at: [
               "2010_07_30"
            ],
            updated_at: [
               "2019_07_26"
            ]
         },
         rev: 4,
         severity: 1,
         signature: "ET P2P BitTorrent DHT announce_peers request",
         signature_id: 2008585
      },
      app_proto: "bittorrent-dht",
      bittorrent_dht: {
         client_version: "4c54012f",
         request: {
            id: "8a6f54ccd5d08ecae972596ad468e4ecc42c94da",
            implied_port: 1,
            info_hash: "02414097c8ae42bb7f5271e69b77c5a7fd75495d",
            port: 3864,
            token: "0241"
         },
         request_type: "announce_peer",
         transaction_id: "4a1a"
      },
      capture_file: "/var/log/suricata/fpc//log-1707179720-6.pcap",
      community_id: "1:3IcellqSgGY9GH4VbUZYYoHLwhw=",
      dest_ip: "23.94.178.77",
      dest_port: 6904,
      direction: "to_server",
      event_type: "alert",
      flow: {
         bytes_toclient: 0,
         bytes_toserver: 201,
         dest_ip: "23.94.178.77",
         dest_port: 6904,
         pkts_toclient: 0,
         pkts_toserver: 1,
         src_ip: "10.1.32.207",
         src_port: 3864,
         start: "2024-02-06T00:38:21.955800+0000"
      },
      flow_id: 1571858375877007,
      geoip: {
         continent_code: "NA",
         country_code2: "CA",
         country_code3: "CA",
         country_name: "Canada",
         ip: "23.94.178.77",
         latitude: 43.6319,
         location: {
            lat: 43.6319,
            lon: -79.3716
         },
         longitude: -79.3716,
         timezone: "America/Toronto"
      },
      host: "suricata",
      in_iface: "tzsp0",
      packet: "SKmKGHltADCTEgfsCABFBAC7s1oAAEAR0lgKASDPF16yTQ8YGvgAp20/ZDE6YWQyOmlkMjA6im9UzNXQjsrpcllq1Gjk7MQslNoxMjppbXBsaWVkX3BvcnRpMWU5OmluZm9faGFzaDIwOgJBQJfIrkK7f1Jx5pt3xaf9dUldNDpwb3J0aTM4NjRlNDpzZWVkaTFlNTp0b2tlbjI6AkFlMTpxMTM6YW5ub3VuY2VfcGVlcjE6dDI6ShoxOnY0OkxUAS8xOnkxOnFl",
      packet_info: {
         linktype: 1
      },
      path: "/var/log/suricata/eve.json",
      payload: "ZDE6YWQyOmlkMjA6im9UzNXQjsrpcllq1Gjk7MQslNoxMjppbXBsaWVkX3BvcnRpMWU5OmluZm9faGFzaDIwOgJBQJfIrkK7f1Jx5pt3xaf9dUldNDpwb3J0aTM4NjRlNDpzZWVkaTFlNTp0b2tlbjI6AkFlMTpxMTM6YW5ub3VuY2VfcGVlcjE6dDI6ShoxOnY0OkxUAS8xOnkxOnFl",
      payload_printable: "d1:ad2:id20:.oT......rYj.h...,..12:implied_porti1e9:info_hash20:.A@...B..Rq..w...uI]4:porti3864e4:seedi1e5:token2:.Ae1:q13:announce_peer1:t2:J.1:v4:LT./1:y1:qe",
      pkt_src: "wire/pcap",
      proto: "UDP",
      src_ip: "10.1.32.207",
      src_port: 3864,
      stream: 0,
      tags: [
         "_geoip_lookup_failure"
      ],
      timestamp: "2024-02-06T00:38:21.955800+0000",
      type: "SELKS"
   },
   _type: "_doc",
   sort: [
      1707179901955
   ]
}

Below is my logstash.conf

input {
  file {
    path => ["/var/log/suricata/*.json"]
    #sincedb_path => ["/var/lib/logstash/"]
    sincedb_path => ["/usr/share/logstash/since.db"]
    codec =>   json
    type => "SELKS"
  }

}

filter {
  if [type] == "SELKS" {

    date {
      match => [ "timestamp", "ISO8601" ]
    }

    ruby {
      code => "
        if event.get('[event_type]') == 'fileinfo'
          event.set('[fileinfo][type]', event.get('[fileinfo][magic]').to_s.split(',')[0])
        end
      "
    }
    ruby {
      code => "
        if event.get('[event_type]') == 'alert'
          sp = event.get('[alert][signature]').to_s.split(' group ')
          if (sp.length == 2) and /\A\d+\z/.match(sp[1])
            event.set('[alert][signature]', sp[0])
          end
        end
      "
     }

    metrics {
      meter => [ "eve_insert" ]
      add_tag => "metric"
      flush_interval => 30
    }
  }

  if [http] {
    useragent {
       source => "[http][http_user_agent]"
       target => "[http][user_agent]"
    }
  }
  if [src_ip]  {
    geoip {
      source => "src_ip"
      target => "geoip"
      #database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
      #add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      #add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
  }
    if [dest_ip]  {
    geoip {
      source => "dest_ip"
      target => "geoip"
      #database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
      #add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      #add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
  }
}

output {
  if [event_type] and [event_type] != 'stats' {
    elasticsearch {
      hosts => "elasticsearch"
      index => "logstash-%{event_type}-%{+YYYY.MM.dd}"
      template_overwrite => true
      template => "/usr/share/logstash/config/elasticsearch7-template.json"
    }
}

if [event_type] and [event_type] == 'alert' {
 email {
	address => "smtp.mailgun.org"
	port => "587"
	username => "XXXXXXXX"
	password => "XXXXXXXXXX"
	authentication => "plain"
	use_tls => "true"
	from => "XXXXXX@email.com"
	subject => "Suricata Alert %{@timestamp}"
	to => "YYYYYY@email.com"
	via => "smtp"
	body => "%{@timestamp} Test 1 ---- Test 3 %{@version} ---- %{geoip.latitude} %{alert.action} %{flow.dest_port}"
}	
 }



else {
    elasticsearch {
      hosts => "elasticsearch"
      index => "logstash-%{+YYYY.MM.dd}"
      template_overwrite => true
      template => "/usr/share/logstash/config/elasticsearch7-template.json"
    }
  }
}

logstash uses a different syntax for fields inside of objects .... %{[geoip][latitude]}, %{[alert][action]} etc. This allows it to distinguish between {"a": { "b": "c"}} and {"a.b": "c"}

Thankyou so much. This has fixed it!

Appreciate your help!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.