How to "join" two different types of documents on the closest value of a common integer key

I have a problem. I've inherited legacy code for which ELK stack is now being used to capture and detect problems. Logstash + Filebeat are being used and an index template is being used to correctly map WGS84 points.

I have data that looks like this:

[
    "survey_item": {
        "geo": "POINT (-87.382741 38.411667)",
        "pos_index": 17382
    },
    "survey_item": {
        "geo": "POINT (-87.382738 38.411678)"
        "pos_index": 17470
    },
    ...
]
[
    "traj_pos": {
        "pos_index": 17488,
        "density": 0.38215
    },
    "traj_pos": {
        "pos_index": 17468,
        "density": 0.97231
    },
    ...
]

I have been trying to come up with a way to "join" traj_pos documents with survey_item documents on the pos_index field within Elasticsearch so I can plot them on a map as the field survey_item.geo colored according to traj_pos.density.

But as you can see from my sample data, any given pos_index value may not exist in both survey_item and traj_pos documents. So I'm looking to do a join to the closest value of pos_index. Is this possible via a query (in some language)?

I know this can be done with some python. I know how to do it. But I want the pipeline from Filebeat to Logstash to Elasticsearch to Kibana to stand alone without requiring manipulation if at all possible.

Alternatively, I could do some Logstash manipulation with the ruby filter to get what I want, but I'd rather find a way to do this that doesn't require setting the number of Logstash workers to 1 and forcing serial execution.

I'm getting the feeling there's no way to do this with Elasticsearch.

The lack of responses likely indicate that is the case. I can personally not think of any way to do it.

1 Like

Wouldn't an enrich pipeline work for this? This is almost exactly what I need for this, but there's no way to force the pipeline query to use the custom score I'm using because I don't know how to reference the search term within the script or pass it via params.

See the "???" in the script part below. I'm guessing access to the actual match_field value is hidden from this enrich/ingest pipeline interface.

The query works to find the points I need, and I'm using the ES Rust client library implementation to make this "join". I just wonder if I could do this entirely within the ingest pipeline mechanism.

PUT /_enrich/policy/add-geodata-policy
{
  "match": {
    "indices": "survey-data-XXXX-XX-XX",
    "match_field": "position_index",
    "enrich_fields": [
      "geo_point"
    ],
    "query": {
      "script_score": {
        "query": {
          "match_all": {}
        }
      },
      "script": {
        "source": "decayNumericLinear(???, params.scale, params.offset, params.decay, doc['position_index'].value)",
        "params": {
          "bucket_pos": 120004,
          "scale": 1,
          "offset": 0,
          "decay": 0.5
        }
      }
    }
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.