Extract the Key from MQTT JSON message

Hi All,

I have a problem with my testing device support MQTT. Here is the MQTT message:

{"device_info": {"uuid": "XXXXXX","fw_ver": "V1.5.0" },"measures": [{"n":"co2", "u":"ppm", "v":1155.995},{"n":"voc", "u":"ppm", "v":0.000e-3},{"n":"co", "u":"ppm", "v":3.898},{"n":"pm10", "u":"ug/m3", "v":3.713},{"n":"pm2.5", "u":"ug/m3", "v":3.713},{"n":"temp", "u":"Cel", "v":22.515},{"n":"hum", "u":"%RH", "v":63.681},{"n":"prb", "u":"hPa", "v":1007.199},{"n":"pm1", "u":"ug/m3", "v":3.511},{"n":"pm4", "u":"ug/m3", "v":3.713},{"n":"iaqi", "u":"count", "v":66},{"n":"tci", "u":"count", "v":84},{"n":"eiaqi", "u":"count", "v":5}]}

It is a sensor that it defines n: name, u: unit, v: value.

So, if want to put all the stuff in ELK through filebeat MQTT input, I need to read the n, u and v, then input to the index.

Here are my questions:

  1. Any functions from filebeat can do so? Can dissect do the work ( Dissect strings | Filebeat Reference [7.16] | Elastic )?
  2. Also, is it possible to use dynamic template (in index template) to create the index?

Thanks in advanced!



If I understand correctly, you would like to insert one document in ES for each of the {"n":"","u":"","v":""} objects contained inside the measures array, am I correct? So from your example

{"device_info": {"uuid": "XXXXXX","fw_ver": "V1.5.0" },"measures": [{"n":"co2", "u":"ppm", "v":1155.995},{"n":"voc", "u":"ppm", "v":0.000e-3},{"n":"co", "u":"ppm", "v":3.898},{"n":"pm10", "u":"ug/m3", "v":3.713},{"n":"pm2.5", "u":"ug/m3", "v":3.713},{"n":"temp", "u":"Cel", "v":22.515},{"n":"hum", "u":"%RH", "v":63.681},{"n":"prb", "u":"hPa", "v":1007.199},{"n":"pm1", "u":"ug/m3", "v":3.511},{"n":"pm4", "u":"ug/m3", "v":3.713},{"n":"iaqi", "u":"count", "v":66},{"n":"tci", "u":"count", "v":84},{"n":"eiaqi", "u":"count", "v":5}]}

in ES you expect the following documents:

{"n":"co2", "u":"ppm", "v":1155.995}

{"n":"voc", "u":"ppm", "v":0.000e-3}

{"n":"co", "u":"ppm", "v":3.898}

{"n":"pm10", "u":"ug/m3", "v":3.713}

{"n":"pm2.5", "u":"ug/m3", "v":3.713}

{"n":"temp", "u":"Cel", "v":22.515}

{"n":"hum", "u":"%RH", "v":63.681}

{"n":"prb", "u":"hPa", "v":1007.199}

{"n":"pm1", "u":"ug/m3", "v":3.511}

{"n":"pm4", "u":"ug/m3", "v":3.713}

{"n":"iaqi", "u":"count", "v":66}

{"n":"tci", "u":"count", "v":84}

{"n":"eiaqi", "u":"count", "v":5}

If that is the case, this is not currently supported by filebeat or ingest processors, as they expect 1 event in and 1 event out, and you would need 1 event in and N events out. One solution could be to change the MQTT messages format to only contain one measurement in each of them, and then just ingesting them separately. Alternatively, it is possible to use script or decode_json_message processors to flatten the list and have all measurements at the document root, which would help a lot while querying or using them in dashboards. As a last resort, if it is an option and you can switch MQTT for HTTP/S, then the httpjson input is capable of creating one document for each of them using its split configuration.

LMK if this helped in any way! :slight_smile:

Logstash has a split processor to split events into multiple, Split filter plugin | Logstash Reference [7.16] | Elastic and I actually just started working on replicating the capability for filebeat inputs though it might be a while before it comes to fruition (if it does), Add new module to split events by legoguy1000 · Pull Request #29951 · elastic/beats · GitHub.

Hi Marc,

Thanks for your reply and spending time in elaborate the format.

What I would like to have is inserting the doc like below with attributes (in the same document)

{"voc": 0.000e-3, "co": 3.898, "pm10": 3.713 .........}



Hi Legoguy1000,

if using split, it would have multiple n, u and v key, is it work?

PS. Not yet used split function before.


Using the logstash split function, you'd tell it to split the measures key and you'd end up with documents like {"device_info": {"uuid": "XXXXXX","fw_ver": "V1.5.0" },"measures": {"n":"co2", "u":"ppm", "v":1155.995}}

Good point @legoguy1000 thanks!

I see what you are trying to do now @folderman , didn't understand at first.

To achieve this you have a couple of options:

1- If you want to use ingest pipelines, you can use the foreach processor:

  - foreach:
      field: mqtt.measures
      ignore_missing: true
          field: '{{_ingest._value.n}}'
          copy_from: _ingest._value.v

This will convert an input like {..."measures": [{"n":"co2", "u":"ppm", "v":1155.995},{"n":"voc", "u":"ppm", "v":0.000e-3}...]...} into {..."co2":1155.995,"voc":0.000e-3...} which is what I think you want.

2- If you do not want to or can't use ingest pipelines, an easy alternative could be using a filebeat script processor similar to:

  - script:
      lang: javascript
      id: my_filter
      source: >
        function process(event) {
            var measures = event.Get("mqtt.measures");
            measures.forEach(m => {
                event.Put(m.n, m.v);
            return event;

Hope this helps! :smiley:

Hi @legoguy1000 and @marc.guasch,

Thanks very much, you both give me many insight, let me take a look on split and foreach processor. Seems having the direction on how to do.

Thanks, guys!


Ohh I also misread, Ya you don't need the split processor, what @marc.guasch posted is right way to do waht u want. As for the dynamic index, u can a flattened field for measures and that way u don't need to ID each key

Hi Both,

I have a logical problem, for field mqtt.measures to work, I think i need to decode the json first (correct me if not), however, if I use decode_json_fields, due to multiple n,u,v, it causes error

Here is the pipeline I tried to create:

PUT _ingest/pipeline/senml
  "processors": [
      "foreach" : {
        "field" : mqtt.measure;
        "processor" : {
          "set" : {
            "field" : '{{_ingest._value.n}}'
            "copy_from: _ingest._value.v

The below error shows:

"reason" : "Unrecognized token 'mqtt': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (ByteArrayInputStream); line: 5, column: 24]"

Actually, I tried changing mqtt.measure to message (just a try), still not working.

I also tried to use GROK, but when I use GROK debugger provided by ELK, I can extract the correct KV pair, but when I create the pipeline, it shows:

      "reason" : "Unexpected close marker '}': expected ']' (for Array starting at [Source: (ByteArrayInputStream); line: 7, column: 21])\n at [Source: (ByteArrayInputStream); line: 8, column: 116]"

GROK Pattern:
{"device_info": {"uuid": "%{UUID:uuid}","fw_ver": "V%{INT:major}.%{INT:minor}.%{INT:patch}\" },"measures": \[{"n":"%{WORD:sensor1}", "u":"ppm", "v":%{NUMBER:value1}},{"n":"%{WORD:sensor2}", "u":"ppm", "v":%{NUMBER:value2a}e%{INT:value2b}},{"n":"%{WORD:sensor3}","u":"ppm", "v":%{NUMBER:value3}},{"n":"%{WORD:sensor4}", "u":"ug/m3", "v":%{NUMBER:value4}},{"n":%{QS:sensor5}, "u":"ug/m3", "v":%{NUMBER:value5}},{"n":"%{WORD:sensor6}", "u":"Cel", "v":%{NUMBER:value6}},{"n":"%{WORD:sensor7}", "u":"%RH", "v":%{NUMBER:value7}},{"n":"%{WORD:sensor8}", "u":"hPa", "v":%{NUMBER:value8}},,{"n":"%{WORD:sensor9}", "u":"ug/m3", "v":%{NUMBER:value9}},{"n":"%{WORD:sensor10}", "u":"ug/m3", "v":%{NUMBER:value10}},{"n":"%{WORD:sensor11}", "u":"count", "v":%{NUMBER:value11}},{"n":"%{WORD:sensor12}", "u":"count", "v":%{NUMBER:value12}},{"n":"%{WORD:sensor13}", "u":"count", "v":%{NUMBER:value13}}]}

Actually, the format I got from MQTT is called SenML (RFC8428), which is used for IoT devices.

Any ways to solve the problem are welcome.


PS. I think pipeline is very close to the destination

Could you share the message as it gets sent by filebeat? Would help to figure out what the pipeline should look like, thanks!

Hi Marc,

I finally use dissect to get the things done (although not smart). Anyway, if interested to see how to build the input for SenML, here is the message collect by Elasticsearch:

{"device_info": {"uuid": "XXXXX-XXXX-XXXX-XXXX-XXXX","fw_ver": "V1.5.0" },"measures": [{"n":"co2", "u":"ppm", "v":989.493},{"n":"voc", "u":"ppm", "v":93.000e-3},{"n":"co", "u":"ppm", "v":2.779},{"n":"pm10", "u":"ug/m3", "v":6.182},{"n":"pm2.5", "u":"ug/m3", "v":6.182},{"n":"temp", "u":"Cel", "v":21.650},{"n":"hum", "u":"%RH", "v":64.808},{"n":"prb", "u":"hPa", "v":1008.909},{"n":"pm1", "u":"ug/m3", "v":5.846},{"n":"pm4", "u":"ug/m3", "v":6.182},{"n":"iaqi", "u":"count", "v":68},{"n":"tci", "u":"count", "v":79},{"n":"eiaqi", "u":"count", "v":5}]}

If we can dynamically input the SenML format, in the future, no matter how many senors included in the device, index can be easily created.


This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.