Quick question. Logstash Multiple output to ES

Hello, once again, its me !

( yeah .. i know .. )

anyway!

I would like to know how would someone succeed in re-indexing to a second index only part of the "logs" that match XY filter.

lemme give you an exemple...
I currentely index everything coming in my network.log file wich are fortigate logs, for now.

We woud like to be able to "fetch" only those whose dstip match local subnet AND reindex em in a new "index" named "ritm-*" that would permit me, to simply run a crontab and delete that specific index each day and restart over ( to save space ) while, having the whole data on my other index. for future investigation.

long story made short, heres my config so far :

apprentely, this, doesnt work. dont know why .. could you guys give me a quick hand please ?

Thank you !

heres the error :

/opt/logstash/bin/logstash -f /etc/logstash/conf.d/10-network_log.conf --configtest

The given configuration is invalid. Reason: Expected one of #, { at line 20, column 9 (byte 291) after filter{

grok {
match => [
"message",
"%{TIMESTAMP_ISO8601:logtimestamp} %{GREEDYDATA:kv}"
]
remove_field => ["message"]
}

if ([dstip] =~ /^10./) {
add_tag {:level=>:fatal}

and again , the config

cat 10-network_log.conf

input {
file {
path => ["/var/log/network.log"]
start_position => "beginning"
type => "FTG"
}
}

filter{

grok {
match => [
"message",
"%{TIMESTAMP_ISO8601:logtimestamp} %{GREEDYDATA:kv}"
]
remove_field => ["message"]
}

if ([dstip] =~ /^10./) {
add_tag => "traffic_lan"
}

kv {
source => "kv"
field_split => " "
value_split => "="
}

date {
match => ["logtimestamp", "ISO8601"]
locale => "en"
remove_field => ["logtimestamp"]
}

mutate {
convert => ["rcvdbyte", "integer"]
convert => ["countdlp", "integer"]
convert => ["countweb", "integer"]
convert => ["countav", "integer"]
convert => ["countemail", "integer"]
convert => ["countips", "integer"]
convert => ["duration", "integer"]
convert => ["sentpkt", "integer"]
convert => ["rcvdpkt", "integer"]
convert => ["sentbyte", "integer"]
convert => ["shaperdroprcvdbyte", "integer"]
convert => ["shaperdropsentbyte", "integer"]
convert => ["filesize", "integer"]
convert => ["count", "integer"]
convert => ["total", "integer"]
convert => ["totalsession", "integer"]
convert => ["bandwidth", "integer"]
add_tag => "fortigate_log"

}

geoip{
source =>"dstip"
database =>"/opt/logstash/GeoLiteCity.dat"
}

geoip{
source =>"srcip"
database =>"/opt/logstash/GeoLiteCity.dat"
}

}

cat 50-elasticsearch-output.conf

output {
if "fortigate_log" in [tags] {
elasticsearch {
hosts => ["localhost:9200"]
index => ["ftg-%{+YYYY.MM.dd}"]
}
}

else if "traffic_lan" in [tags] {
elasticsearch {
hosts => ["localhost:9200"]
index => ["ritm-%{+YYYY.MM.dd}"]
}

}

else {
file {
path => "/var/log/logstash/unknown_messages.log"
}

}
}

somebody could help me please ?

if ([dstip] =~ /^10./) {
add_tag => "traffic_lan"
}

add_tag needs to be inside a filter. Put it in a mutate filter, for example.

1 Like

Thank you.