Logstash - création d'index en fonction de l'IP de provenance

Bonjour à tous,
Etant, comme beaucoup, nouveau sur ELK, je rencontre un petit soucis par rapport à l'output de Logstash.
En fait, pour donner le contexte, j'ai un serveur Rsyslog qui centralise les logs d'un certains nombre d'équipements.
dans l'idée, je voudrais faire en sorte que selon l'adresse IP voir même idéalement du masque IP de l'équipement qui a émis le log, celui ci soit placé dans un index particulier et ce afin de faciliter les recherches dans Kibana par la suite.
Exemple, tout les équipements qui sont sur le réseau 10.10.0.0/16 soient indexés sous l'index "reseau1" et les équipements qui sont sur le réseau 10.10.10.0/24 soient indexés sur l'index "reseau2".
En soit rien de bien compliqué je pense, mais la syntaxe m'échappe un peu ... (beaucoup même pour être franc :slight_smile: )
En gros, un fichier de conf qui ressemblerait à ça :

input {
udp {
host => "SRV-logstash"
port => "5514" ### port configuré sur le serveur Rsyslog
codec => "json"
type => "rsyslog"
}
}

output {

if ip ==> "10.10.0.0/16" {
elasticsearch {
hosts => "localhost:9200"
index => "reseau1"
codec => "json"
}
}

else if ip ==> "10.10.10.0/24" {
elasticsearch {
hosts => "localhost:9200"
index => "reseau2"
codec => "json"
}
}

Avec ce fichier de conf, logstash me renvoi une dans les logs qui indique une configuration invalide après les caractères "if ip" (le 1er j'imagine, je ne pense pas que le second soit vérifié).
J'imagine que cette output n'est pas pris en compte par Logstash mais dans ce cas, comment lui indiquer les différentes sorties en fonction d'une adresse IP ?

Merci d'avance pour votre aide, au besoin je peux donner plus de détails bien entendu...

Peux-tu reformater ton code pour qu'il soit lisible?

</> peut aider.

J'utilise personnellement la syntaxe markdown:

```
CODE
```

Bonjour,
Désolé ce n'est pas dans mes habitudes de poster sur des forums. Et je ne comprends pas bien ce que tu me demandes, mon fichier de conf me parait clair, tu souhaites des balises indiquant les retours charriots etc ... ?

En tout cas merci de ta lecture.

Je voulais dire du code correctement indenté, donc lisible.

ah, ok tout simplement. Il est vrai que je ne fais pas attention à ce détail étant donné que Logstash n'y prête pas attention ...
Quoi qu'il en soit, voila ma conf un peu plus lisible, du moins je l'éspère :

input {
	udp {
		host => "SRV-logstash"
		port => "5514" ### port configuré sur le serveur Rsyslog
		codec => "json"
		type => "rsyslog"
		}
}

output {
	if ip ==> "10.10.0.0/16" {
		elasticsearch {
			hosts => "localhost:9200"
			index => "reseau1"
			codec => "json"
		}
	}

	else if ip ==> "10.10.10.0/24" {
		elasticsearch {
			hosts => "localhost:9200"
			index => "reseau2"
			codec => "json"
		}
	}

Merci. Je ne vois pas ce genre d'opérateur dans la doc: https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#conditionals

Ça vient d'où ?

C'est spécifique aux IPs?

@colinsurprenant peut aider peut-être?

en fait je me suis plus inspiré de ce genre de fil : https://www.elastic.co/guide/en/logstash/current/plugins-filters-cidr.html et de celui ci : https://gist.github.com/mrlesmithjr/72e99caf36fcc2b5d323 malgré le fait que ce plugin soit utilisé en tant que filtre et non pas en tant que output, j'imagine qu'il existe l'équivalent mais je ne trouve rien à ce sujet. Je t'avoue que j'y vais un peu a taton sur mes fichiers de conf, car j'ai encore un peu de mal avec la syntaxe donc ça peut piquer les yeux :slight_smile:

Pense tu que je suis obligé de filtrer les données ( notamment concernant l'IP ) pour les utiliser en output ?
Ce qui me fais penser à ça, c'est que le type de donnée qui m’intéresse fait partie intégrante du log ( le champ host: )

@colinsurprenant semble être très actif sur ELK, en effet si il peut m'aiguiller ça serait super !

Merci.

Bonjour,
J'ai continué à étudier la question, et notamment du côté du filtre CIDR de logstash selon la doc suivante : https://www.elastic.co/guide/en/logstash/current/plugins-filters-cidr.html#plugins-filters-cidr-address qui me paraissait pas mal par rapport à ce que je souhaitait faire, j'ai donc écrit mon fichier de conf comme cela à des fins de test spour une seule machine pour l'instant :

input {
     udp {
             host => "SRV-ELK"
             port => "5514"
             codec => "json"
             type => "rsyslog"
     }
 }
 filter {
         cidr {
                 add_tag => "FQDN_du_client"
                 address => "%{IP_d'un_poste_de_test}"
                 network => "réseau_d'un_poste_de_test/CIDR"
 }
 output {
         elasticsearch {
                 hosts => localhost:9200"
                 index => "%{tags}"
                 codec => "json"
         }
 }

Mais j'ai systématiquement une erreur au lancement de logstash toujours au même endroit ( après un bon nombre de tests différents ) au niveau de l'accolade "{" juste après l'output "elasticsearch" qui indique "[ERROR][logstash.agent ] fetched an invalid config"

Le problème est surement du à une mauvaise utilisation du filtre CIDR car j'utilise cet output de la même façon ( à l’exception de l'index qui est donné en dur et non via la pseudo variable ) sans filtre et ça fonctionne.
Est ce quelqu'un peut m'aiguiller sur l'utilisation de ce filtre ?
D'avance merci.

Au cas où, sépare bien les blocs input, filter et output (ajoute un saut de ligne) et utilise des espaces et non des tabs et essaye de garder une indentation cohérente.

elasticsearch n'est pas au même niveau que udp par exemple.

Aussi %{IP_d'un_poste_de_test} me semble suspect.
Et évite les accents (même si ça devrait marcher à condition que ton fichier soit encodé en UTF8).

En fait, j'ai un peu de mal avec l'éditeur de texte du site, mais dans ma conf l'indentation est bonne, et ce sont bien des tab et non des espaces qui éclaircissement, je m'y suis bien fait sur tes conseils, c'est effectivement plus clair :slight_smile:
J'ai tenté quelque chose de plus simple comme cela :

input {
	udp {
		host => "SRV-ELK"
		port => "5514"
		codec => "json"
		type => "rsyslog"
	}
}
output {
	elasticsearch {
		hosts => "localhost:9200"
		index => "%{host}"
		codec => "json"
	}
}

Et les index se créent bien ( sauf quand les FQDN des clients sont en majuscule mais ça c'est une autre histoire )

Du coup je me suis résigné, je suis très certainement obligé de passer par un filtre. J'ai donc finalement repris mon fichier de conf plus haut en supprimant la ligne "address" dans le filtre car dans la finalité je devrait créer un index par réseau, un par sous réseau et un pour l'ensemble des réseaux.
Bien entendu, aucun accent ou apostrophes ne sont présent dans mon fichier de conf, j'indique cela de façon literale afin de ne pas avoir à donner d'adresse IP sur un forum public.

Mon fichier de conf actuel ressemble donc à cela :

input {
        udp {
                host => "SRV-ELK"
                port => "5514"
                codec => "json"
                type => "rsyslog"
        }
}

filter {
        cidr {
                add_tags => "LAN-TEST"
                network => "10.0.0.0/24"
        }

output {
        elasticsearch {
                hosts => "SRV-ELK:9200"
                index => "%{tags}"
        }
}

Par la suite, il y aura d'autres réseaux à tagger pour la création d'autres indexes mais pour le moment, j'aimerais me contenter faire fonctionner ça.

Mais comme tout à l'heure j'ai toujours une erreur sur le caractère "{" situé après "elasticsearch" dans l'output, je commence à avoir des doutes sur l'utilisation de "%tags" pour la définition de l'index, je n'ai trouvé qu'un seul exemple sur un github inactif depuis pas mal de temps ... Or je suis sur les dernières version de ELK ( 5.1.2 )

En tout cas une fois de plus merci de ton aide.

This can be dynamic using the %{foo} syntax.

Idéalement commence sans elasticsearch. Avec un stdout output et un codec ruby_debug

output {
  stdout { codec => rubydebug }
}

Bon, ça avance, tout doucement ...
Une conf tel que celle ci est bien accepté, Logstash se lance :

input {
	udp {
		host => "SRV-ELK"
		port => "5514"
		codec => "json"
		type => "rsyslog"
	}
}

filter {
	cidr {	
		network => "10.0.0.0/24"
		add_tag => [ "lan1" ]
	}
}

output {
	if "lan1" in [tags] {
		elasticsearch {
			hosts => "localhost:9200"
			index => "lan1"
		}
	}
	else {
		stdout {
			codec => rubydebug	
		}
	}
}

Mais lorsque je génère un événement sur un des hôtes sur le réseau 10.0.0.0/24 qui est bien reporté sur le serveur Rsyslog, celui ci apparait sur la fenêtre de lancement de logstash, le log n'est donc pas pris en compte par le filtre ... Dans le doute je vérifie les index crées avec la commande curl 'localhost:9200/_cat/indices?v' mais l'index lan1 n'apparait pas.
J'ai pensé à un problème d'infra car je fais tout mes tests en environnement virtuel ( virtualbox ) donc toutes mes machines ( ELK, serveur Rsyslog et les deux clients "générateurs de logs" ) sont virtualisés mais quand je fais un tcpdump sur mon elk je vois bien le trafic provenant du réseau 10.0.0.0/24 ... Je suis un peu paumé du coup ...

Concernant l'utilisation de la syntaxe %{foo} pour la création d'indexes dynamique, je ne trouve pas d'exemples concret ... saurais tu m'en dire plus ?

Merci, A+

Et je continue d'avancer ...
Voila ma nouvelle conf :

input {
        	udp {
        		host => "SRV-ELK"
        		port => "5514"
        		codec => "json"
        		type => "rsyslog"
        	}
}

filter {
	cidr {	
		network => "172.16.0.0/16"
	}
	mutate { 
		add_tag => ["lan1"] 
	}
	cidr {
		network => "192.168.0.0/24"
	}
	mutate {
		add_tag => ["lan2"]
	}
}

output {
	if "lan1" in [tags] {
		elasticsearch {
			hosts => "localhost:9200"
			index => "lan1"
		}
	}
	else if "lan2" in [tags] {
		elasticsearch {
			hosts => "localhost:9200"
			index => "lan2"
		}
	}	

	else {
		stdout {
			codec => rubydebug	
		}
	}
}

ça fonctionne, du moins sur le principe ...
Mais il doit me manquer des séparateurs quelque part, parce que concrètement il me tag tout les logs avec le tag lan1, lan2 et qu'il crée un seul index ( lan1 ) et met tout dedans.

C'est logique.

Tu as écrit dans les filtres:

Mets un tag lan1.
Mets un tag lan2

Puis dans output tu mets

si lan1, alors écrit dans lan1 sinon si lan2 ....

Du coup lan1 est toujours dans les tags et tu écris dans lan1 seulement.

Il semblerait bien, mais dans l'idée je souhaitais faire un deux filtre CIDR qui check le réseau de provenance du log et le traite en conséquence :

  • 1er cidr --> réseau 172.16.0.0/16 ?
    ----------> si oui alors tag lan1
    --------------------------------------------> sinon passe vers 2eme cidr

  • 2eme cidr --> réseau 192.168.0.0/24 ?
    -----------> si oui alors tag lan2
    --------------------------------------------> sinon pas de filtres et passage à l'output

Et l'output reprends ce principe

  • Si tu as lan1 de taggé, alors envoyé dans index nommé "lan1"

  • sinon si tu as lan2 de taggé, alors envoyé dans index nommé "lan2"

  • sinon affichage sur l'écran

Mais étant donné le résultat, je n'utilise pas comme il faut le filtre cidr.
Pourtant il me semblait bien que c'était sa fonction ?
https://www.elastic.co/guide/en/logstash/current/plugins-filters-cidr.html

Il faut écrire qq chose du genre:

filter {
  cidr {
    network => "172.16.0.0/16"
    add_tag => ["lan1"]
  }
}

J'avais déjà essayé l'option add_tag du filtre cidr, mais celle ci ne semble pas fonctionner car avec une conf comme celle ci :

filter {
	cidr {	
		network => "172.16.0.0/16"
		add_tag => ["lan1"]
	}


cidr {
    	network => "192.168.0.0/24"
		add_tag => ["lan2"]
	}
}

output {
	if "lan1" in [tags] {
		elasticsearch {
			hosts => "localhost:9200"
			index => "lan1"
		}
	}
	else if "lan2" in [tags] {
		elasticsearch {
			hosts => "localhost:9200"
			index => "lan2"
		}
	}	

else {
	stdout {
		codec => rubydebug	
	}
}

}

Tous mes logs sortent sur la sortie standard, c'est pour ça que j'étais passé par le filtre mutate.

Il faut d'abord résoudre le problème de cidr et éventuellement remonter un bug si il s'avère que c'est un bug.

Je n'ai jamais utilisé ça donc je ne peux pas trop aider là.

Essaye de poster une question du coup sur #logstash (ou ici) avec la conf la plus simple possible:

input { stdin {} }

filter {
  cidr {	
    network => "172.16.0.0/16"
    add_tag => ["lan1"]
  }
}

output {
  stdout { codec => rubydebug }
}

Puis un truc du genre:

echo "XXXXXX" | bin/logstash -f myconf.conf

Ca permettra de facilement reproduire le problème.

Et bien en fait, il y a une chose qui me chagrine sur cidr...
r la doc officiel, il est décrit comme étant : " The CIDR filer is for checking IP addresses in events against a list of network blocks that might contain it"
Mais je me demande si il ne se base pas sur le réseau de la machine qui envois le log, et dans ce cas ça explique que le réseau ne match pas sur ma maquette car mon rsyslog n'est envoi ses logs à ELK sur un autre réseau que le 192.168.0.0/24 et le 172.16.0.0/16 ...

Je vais chercher sur d'autres posts pour voir si il n'y a pas eu de cas similaire mais je suis quand même étonné de faire face à un bug, je ne dois pas être le premier à utiliser cette fonction de cidr dans le même but.

Ce qui m'embête un peu, c'est que je suis complétement dépendant de cidr au moins pour sa fonction network qui m'est indispensable pour identifier le lan de provenance ... Je te tiens au courant, merci pour ton aide.

Les filtres filtrent les events.

Quelle est la tête d'un event en entrée ?

Je ne comprends pas.