Merging two documents from existing ES indices into a new document

Hi all

I'd like to have logstash running through all documents in the index "timeseries" and merge them with documents in another index called "contract". The resulting new merged document should be written into a destination index "merge". Only documents in the "timeseries" index which meet the following criteria shall be processed:

"asset_id" = "ts_asset_id" AND
"contract_direction" = "ts_direction" AND
"ts_timestamp" >= "contract_from" AND "ts_timestamp" <= "contract_to"

I assume this task can be done with the ES input, the ES filter and ES output plugin. https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html. But I am totally unfamiliar with the query language or ruby. Can someone provide a proposal for a logstash .conf file to do the job?

Logstash 7.4.2
ES 7.4.0

Thanks and BR

samples:

document 1, index: "timeseries":
{
    "ts_asset_id": "LU1234",
    "ts_direction": "consumption"
    "ts_timestamp": "2019-10-09T16:15:00.000Z",
    "ts_type": "actual",
    "ts_value": 2.6
}

document 1, index: "contract":
{   
    "asset_id": "LU1234",
    "contract_id": "gri7777777",
    "contract_direction": "consumption"
    "contract_from": "2019-09-30T22:00:00.000Z",
    "contract_to": "2019-10-31T23:00:00.000Z",
    "asset_gridlevel": "5"
}

expected output document, index: "merge":
{   
    "ts_asset_id": "LU1234",
    "ts_direction": "consumption"
    "ts_timestamp": "2019-10-09T16:15:00.000Z",
    "ts_type": "actual",
    "ts_value": 2.6
    "asset_id": "LU1234",
    "contract_id": "gri7777777",
    "contract_direction": "consumption"
    "contract_from": "2019-09-30T22:00:00.000Z",
    "contract_to": "2019-10-31T23:00:00.000Z",
    "asset_gridlevel": "5"

}

/////////////////////////////////

document 2, index: "timeseries":
{
    "ts_asset_id": "LU1234",
    "ts_direction": "consumption"
    "ts_timestamp": "2019-11-04T06:30:00.000Z",
    "ts_type": "actual",
    "ts_value": 3.7
}

document 1, index: "contract":
{   
    "asset_id": "LU1234",
    "contract_id": "gri7777777",
    "contract_direction": "consumption"
    "contract_from": "2019-09-30T22:00:00.000Z",
    "contract_to": "2019-10-31T23:00:00.000Z",
    "asset_gridlevel": "5"
}

expected output document, index: "merge":
no output document ("ts_timestamp" > "contract_to")

/////////////////////////////////

document 3, index: "timeseries":
{
    "ts_asset_id": "LU1234",
    "ts_direction": "consumption"
    "ts_timestamp": "2019-02-04T08:45:00.000Z",
    "ts_type": "actual",
    "ts_value": 14.2
}

document 1, index: "contract":
{   
    "asset_id": "LU1234",
    "contract_id": "gri7777777",
    "contract_direction": "consumption"
    "contract_from": "2019-09-30T22:00:00.000Z",
    "contract_to": "2019-10-31T23:00:00.000Z",
    "asset_gridlevel": "5"
}

expected output document, index: "merge":
no output document ("ts_timestamp" < "contract_from")

/////////////////////////////////

document 1, index: "timeseries":
{
    "ts_asset_id": "LU1234",
    "ts_direction": "consumption"
    "ts_timestamp": "2019-10-09T16:15:00.000Z",
    "ts_type": "actual",
    "ts_value": 2.6
}

document 2, index: "contract":
{   
    "asset_id": "LU4567",
    "contract_id": "gri88888888",
    "contract_direction": "production"
    "contract_from": "2019-09-30T22:00:00.000Z",
    "contract_to": "2019-10-31T23:00:00.000Z",
    "asset_gridlevel": "7"
}

expected output document, index: "merge":
no output document ("ts_asset_id" != "asset_id") and/or ("ts_direction" != "contract_direction")

Or is logstash not capable doing the job and I rather solve it outside of logstash eg. with a python script?

Or has someone an idea how to solve this with the aggregate filter plugin?

Hi.

I suggest you read first index with the ES input plugin

Then, within the filter, use a ES filter plugin to search the matching doc within the second index.

If nothing matches, drop the event.
If it does, enrich the current doc with the values gathered from the filter plugin.

Then index this enriched doc in your destination index.

Pay attention to the filter plugin to limit the size of docs found during the lookup.

Regards