Logstash - Conversion de date

Bonjour tout le monde !

Grâce à Logstash j'indexe toute une table d'une base de donnée MySQL, cependant je n'arrive pas a filtrer correctement un champs pour qu'il soit transformé en quelque chose de lisible par kibana.

Voici mon fichier de conf logstash :

input {
    	jdbc {
    		jdbc_connection_string => "jdbc:mysql://adresse.to.my.mysql.bdd:3306/BDD_NAME"
    		jdbc_user => "xxxxxxxxxxxxxx"
    		jdbc_password => "xxxxxxxxxxxxxxx"
    		jdbc_driver_library => "/path/to/jdbc/drivers/mysql-connector-java-5.1.44/mysql-connector-java-5.1.44-bin.jar"
    		jdbc_driver_class => "com.mysql.jdbc.Driver"
    		statement => "SELECT * FROM ma_table"
    	}
    }

    filter {
    	date {
    		match => [ "date_submitted", "UNIX" ]

    	}
    }

    output {
    	stdout { codec => json_lines }
    	elasticsearch {
    		"hosts" => "localhost:9200"
    		"index" => "my_index"
    		"document_type" => "data"
    	}
    }

Voyez vous quelque chose que j'aurai mal fait ?

Je viens de trouver la solution j'ai oublié la ligne target

input {
jdbc {
jdbc_connection_string => "jdbc:mysql://adresse.to.my.mysql.bdd:3306/BDD_NAME"
jdbc_user => "xxxxxxxxxxxxxx"
jdbc_password => "xxxxxxxxxxxxxxx"
jdbc_driver_library => "/path/to/jdbc/drivers/mysql-connector-java-5.1.44/mysql-connector-java-5.1.44-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "SELECT * FROM ma_table"
}
}

filter {
date {
match => [ "date_submitted", "UNIX" ]
target => "date_submitted"

}
}

output {
stdout { codec => json_lines }
elasticsearch {
"hosts" => "localhost:9200"
"index" => "my_index"
"document_type" => "data"
}
}

Ce fichier de conf fonctionne parfaitement

Super!. Merci d'avoir partagé ta solution !

Si tu pouvais formater ton code avec

```
CODE
```

Ca serait parfait! (tu peux rééditer tes posts si tu veux)

Ca y est c'est fait !

Petite question supplémentaire :
Dans les documents que j'indexe, j'ai plusieurs champs number
Severity
Status
Priorite

Ces champs ne peuvent avoir que certaines valeurs (10, 20, 30, 40, 50, 60, 70, 80,90)
Chacune de ces valeurs corresponds à un niveau de sévérité, un status ou une priorité comme vous vous en doutez.

J'essaye donc dans mon fichier de conf Logstash de faire remplacer ces valeurs number par leur correspondance en String j'ai essayé ca mais sans succès

filter {
	if [status] == "10" {
	mutate {
		replace => [ "status", "New" ]
	}
	}
}

Une petite piste ?

Solution trouvé également, il fallait remplacer
if [status] == "10"
par
if [status] == 10

Vu que status est de type number

Du coup j'ai fini tout mes filtres pour changer mes données mais le temps import est relativement long maintenant, puis-je optimiser mon fichier de conf ?
cf ci-dessous

input {
	//input
	}
}

filter {
	if [status] == 10 {
	mutate {
		replace => [ "status", "New" ]
	}
	}else if [status] == 20 {
	mutate {
		replace => [ "status", "Feedback" ]
	}
	}else if [status] == 30 {
	mutate {
		replace => [ "status", "Acknowledged" ]
	}
	}else if [status] == 40 {
	mutate {
		replace => [ "status", "Confirmed" ]
	}
	}else if [status] == 50 {
	mutate {
		replace => [ "status", "Assigned" ]
	}
	}else if [status] == 80 {
	mutate {
		replace => [ "status", "Resolved" ]
	}
	}else if [status] == 90 {
	mutate {
		replace => [ "status", "Closed" ]
	}
	}
}

filter {
	if [priority] == 10 {
	mutate {
		replace => [ "priority", "none" ]
	}
	}else if [priority] == 20 {
	mutate {
		replace => [ "priority", "low" ]
	}
	}else if [priority] == 30 {
	mutate {
		replace => [ "priority", "normal" ]
	}
	}else if [priority] == 40 {
	mutate {
		replace => [ "priority", "high" ]
	}
	}else if [priority] == 50 {
	mutate {
		replace => [ "priority", "urgent" ]
	}
	}else if [priority] == 80 {
	mutate {
		replace => [ "priority", "immediate" ]
	}
	}
}

filter {
	if [severity] == 20 {
	mutate {
		replace => [ "severity", "trivial" ]
	}
	}else if [severity] == 50 {
	mutate {
		replace => [ "severity", "minor" ]
	}
	}else if [severity] == 60 {
	mutate {
		replace => [ "severity", "major" ]
	}
	}else if [severity] == 80 {
	mutate {
		replace => [ "severity", "block" ]
	}
	}
}

filter {
	date {
		match => [ "date_submitted", "UNIX" ]
		target => "date_submitted"

	}
}

filter {
	date {
		match => [ "last_updated", "UNIX" ]
		target => "last_updated"

	}
}

output {
	//output
	}
}

Regarde si le translate filter ne serait pas plus efficace... https://www.elastic.co/guide/en/logstash/6.0/plugins-filters-translate.html

Merci Dadoo !

Je me posais une petite question concernant le fonctionnement de logstash une fois lancé comme un service.

La pippeline que j'ai défini va donc indexer toutes les entrées d'une table provenant d'une base de donnée MySQL.
Les nouvelles entrées faite dans la base de données vont-elle être indexer automatiquement dans elasticsearch ?

Comme là tu n'as pas mis de schedule, c'est une opération one shot.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.