Hi!
I am trying to set up a pipeline to ingest some logs from Kafka cluster using LogStash and into elasticsearch.
The message on Kafka is a JSON string like:
{"a"="123", "b"="456", msg="this is a test"}
Sometimes, when the msg field contains a certain String, it has some information that I need to parse to add the JSON itself so that it can be indexed in elasticSearch. The number of fields in the msg field is dynamic.
{"a"="123", "b"="456", msg="HEATLTH_CHECK CPU=4 LOAD=123"}
I want to transform it to
{"a"="123", "b"="456", msg="HEATLTH_CHECK CPU=4 LOAD=123", "CPU"=4, "LOAD"=123}
I looked at GROK but am not sure if it can be done because the field name like CPU and LOAD may change and number of fields can change too. The only unchanged part is the keyword HEALTH_CHECK and space-delimited format and the field name won't conflict with the "outer" fields like "a" and "b"
Anyone has similar issue? I am willing to write or customize a bit of Ruby code (I heard logstash plugins are in Ruby) if needed.
Thanks!
Li