Can I enrich lookup source(nested) with jdbc_static filter?

I some have source with generator Here

    generator {
    lines => [
      '{
        "macAddr": "d880397a3b0f",
        "sensors": [
          {"sensor-id": "s01", "sensor-val": 31},
          {"sensor-id": "s02", "sensor-val": 32},
          {"sensor-id": "s03", "sensor-val": 33}
        ],
        "@timestamp": "2019-08-08T12:23:34Z"
      }'
    ]
    count => 1
    codec => json { }
  }

and Filter is here

jdbc_static {
    loaders => [
      {
        id => "remote-device"
        query => "select deviceId, deviceName, spotId, spotName, locGbCode, locGbName, ipAddr, macAddr from rs_temp_device_t"
        local_table => "localDevice"
      },
      {
        id => "remote-sensor"
        query => "select deviceId, sensorId, sensorName, sensorUnit from rs_temp_device_sensor_t"
        local_table => "localSensor"
      }
    ]

    local_db_objects => [
      {
        name => "localDevice"
        index_columns => ["deviceId", "macAddr"]
        columns => [
          ["deviceId", "varchar(10)"],
          ["deviceName", "varchar(20)"],
          ["spotId", "varchar(20)"],
          ["spotName", "varchar(20)"],
          ["locGbCode", "varchar(20)"],
          ["locGbName", "varchar(20)"],
          ["ipAddr", "varchar(20)"],
          ["macAddr", "varchar(20)"]
        ]
      },
      {
        name => "localSensor"
        index_columns => ["deviceId", "sensorId"]
        columns => [
          ["deviceId", "varchar(10)"],
          ["sensorId", "varchar(20)"],
          ["sensorName", "varchar(20)"],
          ["sensorUnit", "varchar(20)"]
        ]
      }
    ]

    local_lookups => [
        {
          query => "select deviceId, deviceName, spotId, spotName, locGbCode, locGbName, ipAddr from localDevice where macAddr = :macAddr"
          parameters => { macAddr => "[macAddr]" }
          target => "device"
        },
        {
          query => "select sensorName, sensorUnit from localSensor where sensorId = :sensorId"
          parameters => { sensorId => "[ #sensors > sensor-id# ]"}
          target => "sensor"
        }
    ]

    add_field => { deviceId => "%{[device][0][deviceid]}" }
    add_field => { deviceName => "%{[device][0][devicename]}" }
    add_field => { spotId => "%{[device][0][spotid]}" }
    add_field => { spotName => "%{[device][0][spotname]}" }
    add_field => { locGbCode => "%{[device][0][locgbcode]}" }
    add_field => { locGbName => "%{[device][0][locgbname]}" }
    add_field => { ipAddr => "%{[device][0][ipaddr]}" }

    add_field => { sensorName => "%{[sensor][0][sensorname]}" }
    add_field => { sensorUnit => "%{[sensor][0][sensorunit]}" }

    remove_field => ["device", "sensor"]
    # {jdbc_settings....}
}

can I lookup sensors(nested list) name and unit data from localSensor table with individual sensor-id?

parameters => { sensorId => "[ #sensors > sensor-id# ]"}

I don't think this will work as expected as you don't have a field called #sensors > sensor-id# (the bit in between the square braces).

Look at the parameters section of the plugin docs for local_lookups, you probably don't need interpolation.

However if you really need a prefix you can move the prefix to the query as (in your example) the prefix is fixed and not dependent on a value from the event...

So this should be OK.

          query => "select sensorName, sensorUnit from localSensor where sensorId = :sensorId"
          parameters => { sensorId => "[ sensor-id]"}

Bear in mind that if you do WHERE sensorId > :sensorId you will get 0, 1 or many entries in the array value of the sensor field.

Lookups usually want to find a single matching data point unless your source value (in your case sensorID) is vague and you will use further logic to pick the most appropriate match from the entries returned by the lookup.

I'm korean. I Can't speak english very well.

I want lookup result

from

"sensors": [
          {"sensor-id": "s01", "sensor-val": 31},
          {"sensor-id": "s02", "sensor-val": 32},
          {"sensor-id": "s03", "sensor-val": 33}
        ]

to

"sensors": [
          {"sensor-id": "s01", "sensor-val": 31, "sensor-name": "sensorNm1"},
          {"sensor-id": "s02", "sensor-val": 32, "sensor-name": "sensorNm2"},
          {"sensor-id": "s03", "sensor-val": 33, "sensor-name": "sensorNm3"}
        ]

is it possible?

I wonder how to change that place to get the answer I want.

  • "local_lookups" field
  • or Add? Combine Other Filter?

OK I understand, apologies, I did not look at the data correctly.

You will need to use the split filter to create a new event from each element in the sensors array. I think you would want single data points in your Kibana graphs anyway.

You can use a Join query in a single lookup.

This is a full workup, i used postgres so I changed the column names:


input {
  generator {
    lines => [
      '{
        "mac_addr": "d880397a3b0f",
        "sensors": [
          {"sensor_id": "s01", "sensor_val": 31},
          {"sensor_id": "s02", "sensor_val": 32},
          {"sensor_id": "s03", "sensor_val": 33}
        ],
        "@timestamp": "2019-08-08T12:23:34Z"
      }'
    ]
    count => 1
    codec => json { }
  }
}

filter {
  split { field => "[sensors]" }
  jdbc_static {
    loaders => [
            {
        id => "remote-device"
        query => 'select "device_id", "device_name", "spot_id", "spot_name", "loc_gb_code", "loc_gb_name", "ip_addr", "mac_addr" from rs_temp_device_t'
        local_table => "localDevice"
      },
      {
        id => "remote-sensor"
        query => 'select "device_id", "sensor_id", "sensor_name", "sensor_unit" from rs_temp_device_sensor_t'
        local_table => "localSensor"
      }
    ]
    local_db_objects => [
      {
        name => "localDevice"
        index_columns => ["device_id", "mac_addr"]
        columns => [
          ["device_id", "varchar(20)"],
          ["device_name", "varchar(20)"],
          ["spot_id", "varchar(20)"],
          ["spot_name", "varchar(20)"],
          ["loc_gb_code", "varchar(20)"],
          ["loc_gb_name", "varchar(20)"],
          ["ip_addr", "varchar(20)"],
          ["mac_addr", "varchar(20)"]
        ]
      },
      {
        name => "localSensor"
        index_columns => ["device_id", "sensor_id"]
        columns => [
          ["device_id", "varchar(20)"],
          ["sensor_id", "varchar(20)"],
          ["sensor_name", "varchar(20)"],
          ["sensor_unit", "varchar(20)"]
        ]
      }
    ]
    local_lookups => [
      {
        query => "SELECT a.device_id, device_name, spot_id, spot_name, loc_gb_code, loc_gb_name, ip_addr, mac_addr, sensor_name, sensor_unit
FROM localDevice a
INNER JOIN localSensor b ON a.device_id = b.device_id
WHERE a.mac_addr = :macaddr AND b.sensor_id = :sensorid"
        parameters => {
          "macaddr" => "[mac_addr]"
          "sensorid" => "[sensors][sensor_id]"
        }
        target => "device_sensor"
      }
    ]
    staging_directory => "/elastic/tmp/logstash-6.3.2/data/jdbc_static/import_data"
    jdbc_user => "logstash"
    jdbc_password => "logstash??"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_driver_library => "/elastic/tmp/postgresql-42.1.4.jar"
    jdbc_connection_string => "jdbc:postgresql://localhost:5432/ls_test_2"
  }
}
output {
  stdout {
    codec => rubydebug {metadata => true}
  }
}

Results:

{
         "sequence" => 0,
       "@timestamp" => 2019-08-08T12:23:34.000Z,
          "sensors" => {
         "sensor_id" => "s01",
        "sensor_val" => 31
    },
         "mac_addr" => "d880397a3b0f",
             "host" => "Elastics-MacBook-Pro.local",
         "@version" => "1",
    "device_sensor" => [
        [0] {
            "device_name" => "Device 1",
            "loc_gb_code" => "code01",
              "device_id" => "dev01",
               "mac_addr" => "d880397a3b0f",
                "spot_id" => "spot01",
            "loc_gb_name" => "Code 1",
            "sensor_unit" => "meter",
                "ip_addr" => "1.1.1.1",
              "spot_name" => "Spot 1",
            "sensor_name" => "Sensor 1"
        }
    ]
}
{
         "sequence" => 0,
       "@timestamp" => 2019-08-08T12:23:34.000Z,
          "sensors" => {
         "sensor_id" => "s02",
        "sensor_val" => 32
    },
         "mac_addr" => "d880397a3b0f",
             "host" => "Elastics-MacBook-Pro.local",
         "@version" => "1",
    "device_sensor" => [
        [0] {
            "device_name" => "Device 1",
            "loc_gb_code" => "code01",
              "device_id" => "dev01",
               "mac_addr" => "d880397a3b0f",
                "spot_id" => "spot01",
            "loc_gb_name" => "Code 1",
            "sensor_unit" => "meter",
                "ip_addr" => "1.1.1.1",
              "spot_name" => "Spot 1",
            "sensor_name" => "Sensor 2"
        }
    ]
}
{
         "sequence" => 0,
       "@timestamp" => 2019-08-08T12:23:34.000Z,
          "sensors" => {
         "sensor_id" => "s03",
        "sensor_val" => 33
    },
         "mac_addr" => "d880397a3b0f",
             "host" => "Elastics-MacBook-Pro.local",
         "@version" => "1",
    "device_sensor" => [
        [0] {
            "device_name" => "Device 1",
            "loc_gb_code" => "code01",
              "device_id" => "dev01",
               "mac_addr" => "d880397a3b0f",
                "spot_id" => "spot01",
            "loc_gb_name" => "Code 1",
            "sensor_unit" => "meter",
                "ip_addr" => "1.1.1.1",
              "spot_name" => "Spot 1",
            "sensor_name" => "Sensor 3"
        }
    ]
}

This used Google Translator.

I tried to use split filter, but it seems that it cannot be used because the number of events increases.
We considered the aggreate filter to combine the events into one, but the candidates were excluded because the number of workers had to be limited to one.
So I decided to modify the input source. (processed using script filter in filebeat)

device(write logfile) -> filebeat input

{"S01": 31, "S02": 32, "macAddr": "d880397a3b0f", "S03": 30}
{"S01": 32, "S02": 32, "macAddr": "d880397a3b0f", "S03": 31}
{"S01": 31, "S02": 32, "macAddr": "d880397a3b0f", "S03": 32}

filebeat output(with script processor) -> logstash input

{
    "@timestamp": "2019-08-13T09:20:46.082Z",
    "macAddr": "d880397a3b0f",
    "sensor-id-1": "S01",
    "sensor-id-2": "S02",
    "sensor-val-3": "31",
    "sensor-val-5": "",
    "sensor-id-6": "",
    "sensor-id-8": "",
    "sensor-id-10": "",
    "sensor-val-2": 30,
    "sensor-id-3": "S03", 
    "sensor-val-4": "",
    "sensor-id-5": "",
    "sensor-val-1": 32,
    "sensor-count": 3,
    "sensor-id-4": "",
    "sensor-val-6": "",
    "sensor-val-7": "",
    "sensor-val-9": "",
    "sensor-val-10": "",
    "sensor-id-7": "",
    "sensor-val-8": "",
    "sensor-id-9": ""
  }

We expect 1 to 10 sensors in one device, and about 20,000 devices. Of course, more devices can be added.
Each device transmits one data per minute.
Therefore, increasing the number of events should be avoided.

So as follows
Extract the sensor data, bring it up to the root level, and run 10 queries (although there may be unnecessary queries).

Input (filebeat output)

filter

filter {
  jdbc_static {
    loaders => [
      {
        id => "remote-device"
        query => "select deviceId, deviceName, spotId, spotName, locGbCode, locGbName, ipAddr, macAddr from rs_temp_device_t"
        local_table => "localDevice"
      },
      {
        id => "remote-sensor"
        query => "select deviceId, sensorId, ifnull(sensorName, ''), ifnull(sensorUnit, '') from rs_temp_device_sensor_t"
        local_table => "localSensor"
      }
    ]
  
    local_db_objects => [
      {
        name => "localDevice"
        index_columns => ["deviceId", "macAddr"]
        columns => [
          ["deviceId", "varchar(10)"],
          ["deviceName", "varchar(20)"],
          ["spotId", "varchar(20)"],
          ["spotName", "varchar(20)"],
          ["locGbCode", "varchar(20)"],
          ["locGbName", "varchar(20)"],
          ["ipAddr", "varchar(20)"],
          ["macAddr", "varchar(20)"]
        ]
      },
      {
        name => "localSensor"
        index_columns => ["deviceId", "sensorId"]
        columns => [
          ["deviceId", "varchar(10)"],
          ["sensorId", "varchar(20)"],
          ["sensorName", "varchar(20)"],
          ["sensorUnit", "varchar(20)"]
        ]
      }
    ]
  
    local_lookups => [
        {
          query => "select deviceId, deviceName, spotId, spotName, locGbCode, locGbName, ipAddr from localDevice where macAddr = :macAddr"
          parameters => { macAddr => "[macAddr]" }
          target => "device"
        },
        {
          query => "select sensorName, sensorUnit from localSensor where sensorId = :sensorId"
          parameters => { sensorId => "[sensor-id-1]"}
          target => "sensor1"
        },
        (.. QUERIES ..)
        {
          query => "select sensorName, sensorUnit from localSensor where sensorId = :sensorId"
          parameters => { sensorId => "[sensor-id-10]"}
          target => "sensor10"
        }
    ]

    add_field => { deviceId => "%{[device][0][deviceid]}" }
    add_field => { deviceName => "%{[device][0][devicename]}" }
    add_field => { spotId => "%{[device][0][spotid]}" }
    add_field => { spotName => "%{[device][0][spotname]}" }
    add_field => { locGbCode => "%{[device][0][locgbcode]}" }
    add_field => { locGbName => "%{[device][0][locgbname]}" }
    add_field => { ipAddr => "%{[device][0][ipaddr]}" }
  
    add_field => { "[sensor-name-1]" => "%{[sensor1][0][sensorname]}" }
    add_field => { "[sensor-unit-1]" => "%{[sensor1][0][sensorunit]}" }
    ((.. ADD FIELDS ..))
    add_field => { "[sensor-name-10]" => "%{[sensor10][0][sensorname]}" }
    add_field => { "[sensor-unit-10]" => "%{[sensor10][0][sensorunit]}" }

    
    remove_field => ["device", "sensor1", "sensor2", "sensor3", "sensor4", "sensor5", "sensor6", "sensor7", "sensor8", "sensor9", "sensor10"]
    staging_directory => "/etc/logstash/vendor/staging"
  
    loader_schedule => "* */2 * * *"
  
    ((.. JDBC SETTINGS ..))
  }
  #remove empty fields...
  ruby {
    code => "
      hash = event.to_hash
      hash.each do |k,v|
        if (v.is_a? String and v.start_with? '%{') or (v.is_a? String and v == '')
          event.remove(k)
        end
      end
    "
  }
}

If you can give me advice to reduce the number of queries, I'll get it, and I'll end this thread.

Thank you very much.

There are a few ideas to consider.

-- using split

  1. Use beats parallelism to distribute work across 10 (example) LS pipelines. Only one of the pipelines needs the loaders and local_db_objects defined as the jdbc_static local DB is single instance JVM wide and will be initialised once when the pipelines are compiled. Each pipeline's jdbc_static filter will need the local_lookups defined.
  2. Store the 1 sensor for 1 device documents in a separate elasticsearch index.
  3. Use elasticsearch aggregations to build a new index holding documents, each containing all sensors for one device. Watcher can do this.

-- not using split.

  1. Think about using a single second query (target => "sensors") that uses the deviceId, added by the first query to return all sensor rows as a multiple element array for that device then use add_field to do this (or a ruby filter to loop through the [sensors] field array value):
    add_field => { "[sensor-name-1]" => "%{[sensors][0][sensorname]}" }
    add_field => { "[sensor-unit-1]" => "%{[sensors][0][sensorunit]}" }
    add_field => { "[sensor-name-2]" => "%{[sensors][1][sensorname]}" }
    add_field => { "[sensor-unit-2]" => "%{[sensors][1][sensorunit]}" }
...
    add_field => { "[sensor-name-10]" => "%{[sensors][9][sensorname]}" }
    add_field => { "[sensor-unit-10]" => "%{[sensors][9][sensorunit]}" }

Actually, because add_field is a multi-valued Hash data data type you can use just one declaration.

I modified my test config not to use split:

input {
  generator {
    lines => [
      '{
        "mac_addr": "d880397a3b0f",
        "sensors": [
          {"sensor_id": "s01", "sensor_val": 31},
          {"sensor_id": "s02", "sensor_val": 32},
          {"sensor_id": "s03", "sensor_val": 33}
        ],
        "@timestamp": "2019-08-08T12:23:34Z"
      }'
    ]
    count => 1
    codec => json { }
  }
}

filter {
  # split { field => "[sensors]" }
  jdbc_static {
    loaders => [
            {
        id => "remote-device"
        query => 'select "device_id", "device_name", "spot_id", "spot_name", "loc_gb_code", "loc_gb_name", "ip_addr", "mac_addr" from rs_temp_device_t'
        local_table => "localDevice"
      },
      {
        id => "remote-sensor"
        query => 'select "device_id", "sensor_id", "sensor_name", "sensor_unit" from rs_temp_device_sensor_t'
        local_table => "localSensor"
      }
    ]
    local_db_objects => [
      {
        name => "localDevice"
        index_columns => ["device_id", "mac_addr"]
        columns => [
          ["device_id", "varchar(20)"],
          ["device_name", "varchar(20)"],
          ["spot_id", "varchar(20)"],
          ["spot_name", "varchar(20)"],
          ["loc_gb_code", "varchar(20)"],
          ["loc_gb_name", "varchar(20)"],
          ["ip_addr", "varchar(20)"],
          ["mac_addr", "varchar(20)"]
        ]
      },
      {
        name => "localSensor"
        index_columns => ["device_id", "sensor_id"]
        columns => [
          ["device_id", "varchar(20)"],
          ["sensor_id", "varchar(20)"],
          ["sensor_name", "varchar(20)"],
          ["sensor_unit", "varchar(20)"]
        ]
      }
    ]
    local_lookups => [
      {
        query => "SELECT device_id, device_name, spot_id
        , spot_name, loc_gb_code, loc_gb_name, ip_addr FROM localDevice WHERE mac_addr = :macaddr"
        parameters => {
          "macaddr" => "[mac_addr]"
        }
        target => "device"
      },
      {
        query => "SELECT sensor_name, sensor_unit FROM localSensor WHERE device_id = :devid"
        parameters => {
          "devid" => "[device][0][device_id]"
        }
        target => "sensor_info"
      }
    ]
    add_field => {
        "[sensors][0][sensor_name]" => "[sensor_info][0][sensor_name]"
        "[sensors][1][sensor_name]" => "[sensor_info][1][sensor_name]"
        "[sensors][2][sensor_name]" => "[sensor_info][2][sensor_name]"
        "[sensors][0][sensor_unit]" => "[sensor_info][0][sensor_unit]"
        "[sensors][1][sensor_unit]" => "[sensor_info][1][sensor_unit]"
        "[sensors][2][sensor_unit]" => "[sensor_info][2][sensor_unit]"
      }
    remove_field => ["sensor_info"]
    staging_directory => "/elastic/tmp/logstash-6.3.2/data/jdbc_static/import_data"
    jdbc_user => "logstash"
    jdbc_password => "logstash??"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_driver_library => "/elastic/tmp/postgresql-42.1.4.jar"
    jdbc_connection_string => "jdbc:postgresql://localhost:5432/ls_test_2"
  }
}
output {
  stdout {
    codec => rubydebug {metadata => true}
  }
}

Results:

{
      "sequence" => 0,
       "sensors" => [
        [0] {
              "sensor_id" => "s01",
             "sensor_val" => 31,
            "sensor_unit" => "[sensor_info][0][sensor_unit]",
            "sensor_name" => "[sensor_info][0][sensor_name]"
        },
        [1] {
              "sensor_id" => "s02",
             "sensor_val" => 32,
            "sensor_unit" => "[sensor_info][1][sensor_unit]",
            "sensor_name" => "[sensor_info][1][sensor_name]"
        },
        [2] {
              "sensor_id" => "s03",
             "sensor_val" => 33,
            "sensor_unit" => "[sensor_info][2][sensor_unit]",
            "sensor_name" => "[sensor_info][2][sensor_name]"
        }
    ],
    "@timestamp" => 2019-08-08T12:23:34.000Z,
      "mac_addr" => "d880397a3b0f",
      "@version" => "1",
          "host" => "Elastics-MacBook-Pro.local",
        "device" => [
        [0] {
            "device_name" => "Device 1",
            "loc_gb_code" => "code01",
              "device_id" => "dev01",
            "loc_gb_name" => "Code 1",
                "spot_id" => "spot01",
                "ip_addr" => "1.1.1.1",
              "spot_name" => "Spot 1"
        }
    ]
}

NOTE: You can also use the ruby filter to do the add_field in case there is no 1 to 1 correlation, for example [sensors][0][sensor_id] is not "s01".

Thank you. and I'll try your ideas.
have a nice weekend.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.