Hi All,
I'm very new to Elastic and Logstash and am having some issues outputting environmental variables to my email output plugin.
I have installed my stack through Selks (Suricata, Logstash, evebox etc etc) and all is configured correctly, I am just additionally setting up some external notifications to an email, then eventually through zapier to slack channel.
Below is the output I am getting to my email, as you can see the environmental variables under "_source" are working correctly, however are coming through as clear text for anything under alert. or geoip.
2024-02-06T00:38:05.952Z Test 1 ---- Test 3 1 ---- %{geoip.latitude} %{alert.action} %{flow.dest_port}
Below is the Alert, as seen in elastic
{
_id: "WW3ae40BQ4suRNJyBmCZ",
_ignored: [
"packet.raw",
"packet.keyword"
],
_index: "logstash-alert-2024.02.06",
_metadata: {
aggregate: true,
count: 1,
escalated_count: 0,
max_timestamp: "2024-02-06T00:38:21.955800+0000",
min_timestamp: "2024-02-06T00:38:21.955800+0000"
},
_score: null,
_source: {
"@timestamp": "2024-02-06T00:38:21.955Z",
"@version": "1",
alert: {
action: "allowed",
category: "Potential Corporate Privacy Violation",
gid: 1,
metadata: {
created_at: [
"2010_07_30"
],
updated_at: [
"2019_07_26"
]
},
rev: 4,
severity: 1,
signature: "ET P2P BitTorrent DHT announce_peers request",
signature_id: 2008585
},
app_proto: "bittorrent-dht",
bittorrent_dht: {
client_version: "4c54012f",
request: {
id: "8a6f54ccd5d08ecae972596ad468e4ecc42c94da",
implied_port: 1,
info_hash: "02414097c8ae42bb7f5271e69b77c5a7fd75495d",
port: 3864,
token: "0241"
},
request_type: "announce_peer",
transaction_id: "4a1a"
},
capture_file: "/var/log/suricata/fpc//log-1707179720-6.pcap",
community_id: "1:3IcellqSgGY9GH4VbUZYYoHLwhw=",
dest_ip: "23.94.178.77",
dest_port: 6904,
direction: "to_server",
event_type: "alert",
flow: {
bytes_toclient: 0,
bytes_toserver: 201,
dest_ip: "23.94.178.77",
dest_port: 6904,
pkts_toclient: 0,
pkts_toserver: 1,
src_ip: "10.1.32.207",
src_port: 3864,
start: "2024-02-06T00:38:21.955800+0000"
},
flow_id: 1571858375877007,
geoip: {
continent_code: "NA",
country_code2: "CA",
country_code3: "CA",
country_name: "Canada",
ip: "23.94.178.77",
latitude: 43.6319,
location: {
lat: 43.6319,
lon: -79.3716
},
longitude: -79.3716,
timezone: "America/Toronto"
},
host: "suricata",
in_iface: "tzsp0",
packet: "SKmKGHltADCTEgfsCABFBAC7s1oAAEAR0lgKASDPF16yTQ8YGvgAp20/ZDE6YWQyOmlkMjA6im9UzNXQjsrpcllq1Gjk7MQslNoxMjppbXBsaWVkX3BvcnRpMWU5OmluZm9faGFzaDIwOgJBQJfIrkK7f1Jx5pt3xaf9dUldNDpwb3J0aTM4NjRlNDpzZWVkaTFlNTp0b2tlbjI6AkFlMTpxMTM6YW5ub3VuY2VfcGVlcjE6dDI6ShoxOnY0OkxUAS8xOnkxOnFl",
packet_info: {
linktype: 1
},
path: "/var/log/suricata/eve.json",
payload: "ZDE6YWQyOmlkMjA6im9UzNXQjsrpcllq1Gjk7MQslNoxMjppbXBsaWVkX3BvcnRpMWU5OmluZm9faGFzaDIwOgJBQJfIrkK7f1Jx5pt3xaf9dUldNDpwb3J0aTM4NjRlNDpzZWVkaTFlNTp0b2tlbjI6AkFlMTpxMTM6YW5ub3VuY2VfcGVlcjE6dDI6ShoxOnY0OkxUAS8xOnkxOnFl",
payload_printable: "d1:ad2:id20:.oT......rYj.h...,..12:implied_porti1e9:info_hash20:.A@...B..Rq..w...uI]4:porti3864e4:seedi1e5:token2:.Ae1:q13:announce_peer1:t2:J.1:v4:LT./1:y1:qe",
pkt_src: "wire/pcap",
proto: "UDP",
src_ip: "10.1.32.207",
src_port: 3864,
stream: 0,
tags: [
"_geoip_lookup_failure"
],
timestamp: "2024-02-06T00:38:21.955800+0000",
type: "SELKS"
},
_type: "_doc",
sort: [
1707179901955
]
}
Below is my logstash.conf
input {
file {
path => ["/var/log/suricata/*.json"]
#sincedb_path => ["/var/lib/logstash/"]
sincedb_path => ["/usr/share/logstash/since.db"]
codec => json
type => "SELKS"
}
}
filter {
if [type] == "SELKS" {
date {
match => [ "timestamp", "ISO8601" ]
}
ruby {
code => "
if event.get('[event_type]') == 'fileinfo'
event.set('[fileinfo][type]', event.get('[fileinfo][magic]').to_s.split(',')[0])
end
"
}
ruby {
code => "
if event.get('[event_type]') == 'alert'
sp = event.get('[alert][signature]').to_s.split(' group ')
if (sp.length == 2) and /\A\d+\z/.match(sp[1])
event.set('[alert][signature]', sp[0])
end
end
"
}
metrics {
meter => [ "eve_insert" ]
add_tag => "metric"
flush_interval => 30
}
}
if [http] {
useragent {
source => "[http][http_user_agent]"
target => "[http][user_agent]"
}
}
if [src_ip] {
geoip {
source => "src_ip"
target => "geoip"
#database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
#add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
#add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
}
if [dest_ip] {
geoip {
source => "dest_ip"
target => "geoip"
#database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
#add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
#add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
}
}
output {
if [event_type] and [event_type] != 'stats' {
elasticsearch {
hosts => "elasticsearch"
index => "logstash-%{event_type}-%{+YYYY.MM.dd}"
template_overwrite => true
template => "/usr/share/logstash/config/elasticsearch7-template.json"
}
}
if [event_type] and [event_type] == 'alert' {
email {
address => "smtp.mailgun.org"
port => "587"
username => "XXXXXXXX"
password => "XXXXXXXXXX"
authentication => "plain"
use_tls => "true"
from => "XXXXXX@email.com"
subject => "Suricata Alert %{@timestamp}"
to => "YYYYYY@email.com"
via => "smtp"
body => "%{@timestamp} Test 1 ---- Test 3 %{@version} ---- %{geoip.latitude} %{alert.action} %{flow.dest_port}"
}
}
else {
elasticsearch {
hosts => "elasticsearch"
index => "logstash-%{+YYYY.MM.dd}"
template_overwrite => true
template => "/usr/share/logstash/config/elasticsearch7-template.json"
}
}
}