Logstash - création d'index en fonction de l'IP de provenance

les logs que je reçoient sur la sortie standard ressemblent à ça :

"severity" => "info",
"@timestamp" => 2017-01-26T10:07:17.000Z,
"@version" => "1",
"host" => "client-log2",
"programname" => "ntpd",
"procid" => "1782",
"message" => " Listen normally on 2 lo 127.0.0.1 UDP 123",
"type" => "rsyslog",
"facility" => "daemon"

ou le host client-log2 est à pour config 192.168.0.1/24 et pour passerelle 192.168.0.254 ( une des interfaces du serveur Rsyslog )

Du coup, j'ai posté sur le forum #logstash voir si quelqu'un avait déjà eu ce problème, en attendant j'ai trouvé une autre possible solution qui pourrait correspondre à mon infra : grok

En me basant sur les noms d'hôtes ( qui sont relativement bien conventionnés sur mon infra ) je vais pourrais filtrer avec "match" et ajouter un tag en cas de match pour construire et classer mes logs dans mes indexes, ça a l'air de plutôt bien fonctionner, la conf donne ça :

filter {
	grok {	
		match => { "host" => "client-log1" }	
		add_tag => ["lan1"]
	}
	
	grok {
		match => { "host" => "client-log2" }
		add_tag => ["lan2"]
	}
}

output {
	if "lan1" in [tags] {
		elasticsearch {
			hosts => "localhost:9200"
			index => "lan1"
		}
	}
	else if "lan2" in [tags] {
		elasticsearch {
			hosts => "localhost:9200"
			index => "lan2"
		}
	}	

	else {
		stdout {
			codec => rubydebug	
		}
	}
}

Je pourrais affiner ou grossir le filtre avec des regex dans le filtre match au besoin.
Autre constat, les logs prennent systématiquement, en plus de lan1 et lan2, le tag "_grokparsefailure". Pour ceux du lan2 ça se comprends, car ils n'ont pas matché sur le premier grok mais ceux du second, ça m'échappe.
Je vais vérifier ça je n'ai pas encore regardé mais au pire, ce n'est pas bien dramatique, c'est juste que ça ne fait pas très propre de voir des "failure" sur les logs et ça peut induire en erreur.

Je ressaierais le filtre cidr / network sur mon environnement finale voir si ce n'est vraiment pas un problème de virtualisation mais au pire, j'utiliserais grok.
Dommage, ça avait l'air vraiment fin le filtrage cidr ... si j'ai des nouvelles sur l'autre post je te tiendrais quand même au courant ici, en tout cas merci encore pour ton aide et tes conseils.

J'ai eu plusieurs éléments de réponse sur #logstash , consultable ici : https://discuss.elastic.co/t/using-cidr-plugin-for-tagging-logs/72878/16 qui tournent autour de regex, je vais travailler la dessus.
Merci encore pour ton aide dadoonet.

Oui. J'ai vu que Magnus avait créé d'ailleurs:

Désolé du délais - juste pour clarifier car ca ne semblait pas clair:
le CIDR filter vérifie une ou des adresses IP spécifiées dans le champ de configuration du plugin address.

Il faut donc absolument fournir l'adresse IP à vérifier au CIDR filter et c'est pour ca que les exemples de configuration mentionnent:

address => [ "%{src_ip}", "%{dst_ip}" ]

ou

address => [ "%{clientip}" ]

ou l'adresse IP sera dans le event field [src_ip], [dst_ip] ou [clidentip].

Bonjour @colinsurprenant
Oui c'est ce que m'a expliqué Magnus sur #logstash , je n'avais pas bien saisie l'utilité de ce plugin. car je souhaitais filtrer les logs en fonction de leurs réseaux et non en fonction de leurs IP. Du coup, j'ai utilisé des regex. Pour ceux à qui ça peut servir, voila comment j'ai effectué mes filtres :

filter {
            if [host] =~ /10\.([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})/ {
            mutate { add_tag => "site1" }
            }

            else if [host] =~ /10\.11\.([0-9]{1,3})\.([0-9]{1,3})/ {
            mutate { add_tag => ["site2"] }
            }

}

Cela indique que SI un évènement comprend dans le champ "host" une adresse en 10.X.X.X (ou X est compris entre 0 et 9 et peut comporter 1 à 3 chiffres ), alors on applique le filtre mutate et on ajoute un tag "site1"

SINON SI un évènement comprend dans le champ "host" une adresse en 10.11.X.X (ou X est compris entre 0 et 9 et peut comporter 1 à 3 chiffres ), alors on applique le filtre mutate et on ajoute un tag "site2"

etc .... (on peut également ajouter directement un champ, cela revient au même.)

Ensuite, en sortie, j'ai appliqué :

output { 

        if "site1" in [tags] {
                elasticsearch {
                        hosts => "localhost:9200"
                        index => "site1"
                }
        }

        else if "site2" in [tags] {
                elasticsearch {
                        hosts => "localhost:9200"
                        index => "site2"
                }
    }
}

etc ...
Cela me permet d'indexer dans des index différents chaque évènement en fonction de leurs réseau de provenance.

Le plugin CIDR paraissait intéressant mais j'ai environ 1500 hôtes qui vont me générer des logs, je ne pouvais donc pas prévoir leurs IP/32 individuellement.

Cette partie a été résolu pour moi.
Merci quand même de ton aide, on a jamais assez d'explications.

Le seul "souci" que j'ai avec cette solution, c'est que tout les logs qui sont taggés prennent systématiquement le tag "_grokparsefailure", ce qui peut induire en erreur car au finale, les autres tags s'apposent bien.

Après quelques recherches, il semblerait que ce soit du non pas au filtres effectués mais à l'input "syslog" que j'utilise en entrée. En effet, un pré-filtrage est effectué quand on indique en input le type "syslog", qui se fait selon la RFC 3164 (BSD syslog protocol) alors que les logs sont généralement formatés selon la RFC 5424 (syslog protocol) qui comporte quelques champs différents. Du coup un premier "grok" est effectué lors de la récupération du log en input et certains champs ne correspondent pas, d'ou l'apposition du tag "_grokparsefailure".
(infos récupérés ici : http://stackoverflow.com/questions/28216269/grokparsefailure-on-successful-match )

Cela n'est pas bloquant en soit, je ne me suis donc pas encore penché sur ce problème mais si quelqu'un à une idée pour supprimer ce tag, je suis preneur :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.