Logstash aggregate nested field


#1

Bonjour,

Je veux regrouper les logiciels par un id (nom pc). Pour cela j'utilise le filtre aggregate. Mais le regroupement ne s'effectue pas.
Peut être je rate quelque chose (ci-dessous mon code):

input {
 elasticsearch {
    hosts => ["localhost:9200"]
    index => "pc-2018.02.19"
   query => '{ "query": { "query_string": { "query": "*" } },"_source": 
   ["beat.hostname","system.process.name"]}'
   docinfo => true
  docinfo_target => "@metadata"
  scroll => "5m"
  tags => "pc_software"
 }
}

filter {
    if "pc_software" in [tags] {
             aggregate {
               task_id => "%{[beat][hostname]}"
               code => "
                     map['[beat][hostname]'] = event.get('[beat][hostname]')
                     map['software'] ||= []
                     map['software'] << {'[software_name]' => event.get('[system][process][name]')}
                     #event.cancel()
               "
               push_map_as_event_on_timeout => true
               timeout => 1000
            }
           mutate {
             remove_field => ["[beat][hostname]", "[system][process][name]"]
           }
    }
}

output {
 if "pc_software" in [tags] {
      elasticsearch {
         hosts => ["localhost:9200"]
         index => "pc_logiciel-%{+YYYY.MM.dd}"
   }
  }
}

Merci d'avance pour votre aide!


(Fabien Baligand) #2

Qu'entends tu exactement par "le regroupement ne se fait pas" ?
Peux-tu donner un exemple donnant ce que tu attends en sortie, et ce que tu obtiens au final ?

Sinon, en premières remarques :

  • ton timeout est très long : 1000 secondes, donc il faut attendre 1000 secondes avant que ton évènement aggrégé soit généré
  • attention, si tu écris map['[beat][hostname]'], tu crées un champ qui s'appelle exactement comme ça, tu ne crées pas un champ 'beat' avec dedans un champ 'hostname'
    pour faire ça, il faut écrire : map['beat'] = {}; map['beat']['hostname'] = event.get('[beat][hostname]')
  • je suis surpris que ton dernier filtre supprime le champ "hostname", saches que ce champ sera aussi supprimé sur l'évènement aggrégé généré au timeout

#3

Bonjour,
Merci de me répondre.

Le champ [beat][hostname] est un champ imbriqué d'où cette syntaxe pour pouvoir avoir la valeur du hostname.

Ceci j'ai changé mon code pour voir si c'était les champs imbriqués en les remplacant. Aussi j'ai trié les données dans la requête pas ordre croissant , ajouté push_previous_map_as_event => true, et le temps à 5s . Cependant j'ai toujours pas de résultats . Je sais pas si c'est le plugin ou quoi d'autre dans mon code qui cloche.
ci dessous mon code:

   input {
     elasticsearch {
    hosts => ["localhost:9200"]
   index => "pc-2018.02.19"
   query => '{ "query": { "query_string": { "query": "*" } },"sort" :[{"beat.hostname":{"order" 
   :"asc"}}],"size":20,"_source": ["beat.hostname","system.process.name"]}'
    docinfo => true
    docinfo_target => "@metadata"
   scroll => "5m"
   tags => "pc_logiciel"
  }
}

filter {
    if "pc_logiciel" in [tags] {
		mutate {
				replace => { "logiciel" => "%{[system][process][name]}"}
		}
		mutate {
				replace => { "pc" => "%{[beat][hostname]}"}
		}
		aggregate {
		   task_id => "%{pc}"
		   code => "
				 map['pc'] = event.get('pc')
				 map['logiciel'] ||= []
				 map['logiciel'] << {'[logiciel]' => event.get('logiciel')}
				 #event.cancel()
		   "
		   push_previous_map_as_event => true
		   timeout => 5
		   timeout_tags => ["aggregate"]
	   }
    }
    if "aggregate" not in [tags] {
            drop{}
    }
}

output {
  if "pc_logiciel" in [tags] {
    elasticsearch {
    hosts => ["localhost:9200"]
    index => "pc_logiciel-%{+YYYY.MM.dd}"
    }
  }
}

(Fabien Baligand) #4

A priori, je ne vois pas de problème majeur dans ta configuration...
J'aurais ptet juste changé cette ligne :
map['logiciel'] << {'[logiciel]' => event.get('logiciel')}
par :
map['logiciel'] << {'logiciel' => event.get('logiciel')}
mais c'est un détail.

Pour que tu puisses debugger la situation, je pense que ce serait bien de mettre comme sortie stdout { codec => rubydebug }
Et que tu ne supprimes pas les évènements qui ne sont pas aggrégés, seulement ceux qui ne correspondent pas à la condition "pc_logiciel" in [tags].
Je soupçonne que tu n'aies pas d'évènements qui rentrent dans le "if".

J'ai déjà utilisé le même genre de configuration avec aggregate et n'ai pas eu de problème.
A tout hasard, quelle version utilises-tu de Logstash et du filtre aggregate ?
Pour la version du filtre aggregate, tu peux voir ça dans le fichier $LOGSTASH_HOME/Gemfile.lock


#5

La version de logstash est : 5.6.4 et la version du plugin est: 2.7.2

J'ai fait la modification en changeant :

map['logiciel'] << {'[logiciel]' => event.get('logiciel')}

par:

map['logiciel'] << {'logiciel' => event.get('logiciel')}

Ensuite j'ai enlevé le if :

filter {
	mutate {
			replace => { "logiciel" => "%{[system][process][name]}"}
	}
	mutate {
			replace => { "pc" => "%{[beat][hostname]}"}
	}
	 aggregate {
	   task_id => "%{pc}"
	   code => "
			 map['pc'] = event.get('pc')
			 map['logiciel'] ||= []
			 map['logiciel'] << {'logiciel' => event.get('logiciel')}
	   "
	   push_previous_map_as_event => true
	   timeout => 5
	   timeout_tags => ["aggregate"]
	}
    if "aggregate" not in [tags] {
            drop{}
    }
} 

ET mon losgtsah ne fait que redémarrer en me générant une erreur comme si quelque chose c'était cassé car j'ai essayé des codes qui fonctionnaient auparavant ou encore codes simples mais j'ai cette erreur:

[2018-02-22T11:37:49,192][ERROR][logstash.agent           ] Pipeline aborted due to error {:exception=>#
<Manticore::UnknownException: Unexpected error: java.security.InvalidAlgorithmParameterException: 
  the trustAnchors parameter must be non-empty>, :backtrace=>
   ["/usr/share/logstash/vendor/bundle/jruby/1.9/gems/manticore-0.6.1-
  java/lib/manticore/response.rb:37:in `initialize'", "org/jruby/RubyProc.java:281:in `call'", 
  /usr/share/logstash/vendor/bundle/jruby/1.9/gems/manticore-0.6.1-
   java/lib/manticore/response.rb:79:in `call'", 
  "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
  java/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb:74:in `perform_request'", 
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
  java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:279:in `perform_request_to_url'", 
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
  java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:241:in `healthcheck!'", 
  "org/jruby/RubyHash.java:1342:in `each'", 
  "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
  java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:237:in `healthcheck!'", 
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
   java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:329:in `update_urls'", 
 "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
  java/lib/logstash/outputs/elasticsearch/http_client/pool.rb:70:in `start'", 
  "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
  java/lib/logstash/outputs/elasticsearch/http_client.rb:290:in `build_pool'", 
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
 java/lib/logstash/outputs/elasticsearch/http_client.rb:60:in `initialize'", 
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
 java/lib/logstash/outputs/elasticsearch/http_client_builder.rb:101:in `create_http_client'", 
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
java/lib/logstash/outputs/elasticsearch/http_client_builder.rb:97:in `build'", 
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
java/lib/logstash/outputs/elasticsearch.rb:230:in `build_client'", 
"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-7.4.2-
java/lib/logstash/outputs/elasticsearch/common.rb:24:in `register'", "/usr/share/logstash/logstash-
 core/lib/logstash/output_delegator_strategies/shared.rb:9:in `register'", "/usr/share/logstash/logstash-
 core/lib/logstash/output_delegator.rb:43:in `register'", "/usr/share/logstash/logstash-
 core/lib/logstash/pipeline.rb:290:in `register_plugin'", "/usr/share/logstash/logstash-
 core/lib/logstash/pipeline.rb:301:in `register_plugins'", "org/jruby/RubyArray.java:1613:in `each'", 
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:301:in `register_plugins'", 
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:310:in `start_workers'", 
"/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:235:in `run'", 
"/usr/share/logstash/logstash-core/lib/logstash/agent.rb:408:in `start_pipeline'"]}

#6

Bonsoir Monsieur fbaligand,

J'ai fait comme vous l'avez suggéré, cependant j'ai comme sorties des messages d'erreurs:

[2018-02-23T17:57:48,376][WARN ][logstash.outputs.elasticsearch] Could not index event to 
Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"test-pr-pc_logiciel-2018.02.23", 
:_type=>"logs", :_routing=>nil}, 2018-02-23T16:57:48.338Z %{host} %{message}], :response=>
{"index"=>{"_index"=>"test-pr-pc_logiciel-2018.02.23", "_type"=>"logs", 
"_id"=>"AWHDmXA7dcwBLBnIf8UK", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", 
 "reason"=>"failed to parse [logiciel]", "caused_by"=>{"type"=>"illegal_state_exception", 
"reason"=>"Can't get text on a START_OBJECT at 1:88"}}}}}
[2018-02-23T17:57:48,376][WARN ][logstash.outputs.elasticsearch] Could not index event to 
Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"test-pr-pc_logiciel-2018.02.23", 
:_type=>"logs", :_routing=>nil}, 2018-02-23T16:57:48.338Z %{host} %{message}], :response=>
{"index"=>{"_index"=>"test-pr-pc_logiciel-2018.02.23", "_type"=>"logs", 
"_id"=>"AWHDmXA7dcwBLBnIf8UL", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", 
"reason"=>"failed to parse [logiciel]", "caused_by"=>{"type"=>"illegal_state_exception", 
 "reason"=>"Can't get text on a START_OBJECT at 1:87"}}}}}

alors que ma configuration me semble correcte:

 input {
    elasticsearch {
   hosts => ["localhost:9200"]
  index => "z76-pr-2018.02.19"
  query => '{ "query": { "query_string": { "query": "*" } },"sort" :[{"beat.hostname":{"order" 
  :"asc"}}],"size":20,"_source": ["beat.hostname","system.process.name"]}'
  size => 10
  ssl => true
  ca_file => "/etc/logstash/ssl/ca.pem"
  docinfo => true
 docinfo_target => "@metadata"
  scroll => "5m"
  tags => "pc_logiciel"
  }
 }

filter {
if "pc_logiciel" in [tags] {
	mutate {
		replace => { "logiciel" => "%{[system][process][name]}"}
	}
	mutate {
		replace => { "pc" => "%{[beat][hostname]}"}
	}
	 aggregate {
	   task_id => "%{pc}"
	   code => "
			 map['pc'] = event.get('pc')
			 map['logiciel'] ||= []
			 map['logiciel'] << {'logiciel' => event.get('logiciel')}
	   "
	   push_previous_map_as_event => true
	   timeout => 5
	   timeout_tags => ["aggregate"]
	}
}
else {
   drop{}
 }
}

output {
  if "pc_logiciel" in [tags] {
    elasticsearch {
    hosts => ["localhost:9200"]
    index => "pc_logiciel-%{+YYYY.MM.dd}"
 }
}
stdout { codec => rubydebug }
}

Je sais pas ce qui cloche vraiment car tout semble correct. Quand j'enlève le plugin aggregate j'importe les données correctement. Mais avec le plugin j'ai des erreurs.
Où se trouve l'erreur dans ma config ou c'est le plugin qui marche pas correctement?

Ps: j'ai changé mon logstash la nouvelle version est 5.8.1 et le plugin aggreagte:2.7.2

Merci par avance de votre aide !


(Fabien Baligand) #7

Il semblerait que tu aies un problème de mapping Elasticsearch.

Vu ton code dans le filtre aggregate, en sortie, tu devrais avoir :
{‘pc’: ‘pc1’, ‘logiciel’: [ { ‘logiciel’: ‘logiciel1’ } ] }

Et au vu du message d’erreur ES, ton mapping ES du champ racine ‘logiciel’ est de type ‘text’, ce qui est incompatible avec une structure json.
Donc il faut soit adapter ton mapping ES soit ton code aggregate.

Et sinon, la version 5.8.1 de Logstash n’existe pas.


#8

Merci de me repondre.

Pour le version, Je me suis trompé c'est plutot 5.6.8 et j'ai aussi fait le test sur d'autres serveurs qui ont une version 5.6.3 ou 5.6.4.

Pour le mapping j'en ai pas fait. Je voulais importer les données et laisser elasticsearch fait un mapping et des typages par défaut car actuellement j'ai aucun index crée.

J'essaierai de faire un template avant import des données enfin d'adapter le mapping le Lundi vu qu'on est en week-end et j'ai pas accès au serveur. Ensuite je vous ferai un retour.

Merci de votre aide!


(Fabien Baligand) #9

Quelques petites dernières remarques pour lundi :

  • une autre option que changer le mapping ES est de changer le code 'aggregate' :
    map['logiciel'] << event.get('logiciel'). Tu as ainsi un simple tableau de string.
  • pour être sûr que les évènements soient traités dans le bon ordre, il faut que le nombre de "pipeline workers" soit égal à 1. Pour cela, il suffit d'ajouter le flag -w 1 au démarrage de Logstash.
  • je pense que tu as très bien fait de finalement utiliser l'option push_previous_map_as_event => true, qui est beaucoup plus efficace et moins consommatrice en mémoire pour ton cas d'utilisation.
  • l'output "stdout" te permet de voir dans la sortie console de Logstash, les documents JSON qui vont être indexés dans ES, en particulier, l'évènement agrégé généré par le plugin aggregate lors du timeout.
  • maintenant qu'on voit bien que le plugin aggregate fait son boulot, tu peux éventuellement remettre en place la suppression des évènements non agrégés :
if "aggregate" not in [tags] {
    drop{}
}

#10

Bonjour M. fbaligand,

J'ai fait les tests et ça marche effectivement un tableau.

Mes logiciels sont regroupés dans un tableau par PC:

Cependant j'aimerais ne pas avoir des doublons (mais avoir distinct logiciel) dans le tableau.
Pour cela j'avais pensé un if sur le tableau pour vérifier sur un logiciel n'existe pas avant ajout. Mais je me demande s'il n'y pas une méthode (fonction) plus optimale dans le plugin aggregate qui permet de s'assurer qu'une donnée n'est ajouté qu'une fois au tableau.

Merci pour votre aide!


(Fabien Baligand) #11

Tu peux faire ça avec cette ligne de code dans le filtre aggregate :

map['logiciel'] << event.get('logiciel') unless map[‘logiciel’].include?(event.get('logiciel'))

Au passage, tu peux retrouver les méthodes disponibles dans la doc Ruby.
Ici la doc sur les tableaux :
https://docs.ruby-lang.org/en/2.0.0/Array.html


#12

Ça marche nickel!

Merci pour ton aide :grin:


(Fabien Baligand) #13

Content d’avoir pu t’aider :slight_smile:


(system) #14

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.