How to aggregate data on elastic sent by logstash

I am working on below schema, grouping multiple households based on the keyinformation. We could achieve this with the logstash aggregate filter. But this is having performance issue and also cannot run with multiple threads/pipelines. Is there a way to do this with out aggregate filter. canwe send the data without aggregation to elastic and elastic somehow does the aggregation with the keyinformation.

here is the schema,

POST /test/_mapping
{
"dynamic": "strict",
"properties" : {
"id" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 100}}},
"keyInformation" : {
"properties" : {
"entityId" : {"type" : "long"},
"entityType" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 3}}},
"environmentId" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 50}}},
"firmId" : {"type" : "long"},
"key" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 100}}},
"lastUpdated" : {"type" : "date"}
}
},
"associatedHouseholds" : {
"type": "nested",
"properties" : {
"householdId" : {"type" : "long"},
"type" : {"type" : "long"},
"nickname" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 100}}},
"title" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 100}}},
"acctStartDate": {"type" : "date"},
"acctStopDate": {"type" : "date"},
"lastUpdated" : {"type" : "date"}
}
}
}
}

sample data:

ENVIRONMENT_ID|ENTITY_ID|ENTITY_TYPE|SPN_FIRM_ID|householdId|TYPE|TITLE|NICKNAME|ACCTSTARTDATE|ACCTSTOPDATE
QA_FDX|363147|AC|101|146180|1|HH TITLE 146180|HH NICKNAME 146180|2019-01-01|2019-12-01
QA_FDX|363147|AC|101|146181|1|HH TITLE 146181|HH NICKNAME 146181|2019-01-01|2020-12-01
QA_FDX|363149|AC|101|146181|1|HH TITLE 146181|HH NICKNAME 146181|2019-01-01|2019-12-01
QA_FDX|363149|AC|101|146182|1|HH TITLE 146182|HH NICKNAME 146182|2019-01-01|2029-12-01
QA_FDX|363150|AC|101|146180|1|HH TITLE 146180|HH NICKNAME 146180|2019-01-01|2039-12-01
QA_FDX|363152|AC|101|146180|1|HH TITLE 146180|HH NICKNAME 146180|2019-01-01|2049-12-01
QA_FDX|363153|AC|101|146180|1|HH TITLE 146180|HH NICKNAME 146180|2019-01-01|2059-12-01
QA_FDX|363147|AC|101|146182|1|HH TITLE 146182|HH NICKNAME 146182|2019-01-01|2019-12-01
QA_FDX|363147|AC|101|146183|1|HH TITLE 146183|HH NICKNAME 146183|2019-01-01|2020-12-01
QA_FDX|363149|AC|101|146183|1|HH TITLE 146183|HH NICKNAME 146183|2019-01-01|2019-12-01
QA_FDX|363149|AC|101|146184|1|HH TITLE 146184|HH NICKNAME 146184|2019-01-01|2029-12-01
QA_FDX|363150|AC|101|146181|1|HH TITLE 146181|HH NICKNAME 146181|2019-01-01|2039-12-01
QA_FDX|363152|AC|101|146181|1|HH TITLE 146181|HH NICKNAME 146181|2019-01-01|2049-12-01
QA_FDX|363153|AC|101|146181|1|HH TITLE 146181|HH NICKNAME 146181|2019-01-01|2059-12-01

expected data loaded to elastic output:

      "associatedHouseholds" : [
        {
          "acctStopDate" : "2019-12-01",
          "nickname" : "HH NICKNAME 146180",
          "type" : "1",
          "householdId" : "146180",
          "acctStartDate" : "2019-01-01",
          "title" : "HH TITLE 146180"
        },
        {
          "acctStopDate" : "2020-12-01",
          "nickname" : "HH NICKNAME 146181",
          "type" : "1",
          "householdId" : "146181",
          "acctStartDate" : "2019-01-01",
          "title" : "HH TITLE 146181"
        },
        {
          "acctStopDate" : "2019-12-01",
          "nickname" : "HH NICKNAME 146182",
          "type" : "1",
          "householdId" : "146182",
          "acctStartDate" : "2019-01-01",
          "title" : "HH TITLE 146182"
        },
        {
          "acctStopDate" : "2020-12-01",
          "nickname" : "HH NICKNAME 146183",
          "type" : "1",
          "householdId" : "146183",
          "acctStartDate" : "2019-01-01",
          "title" : "HH TITLE 146183"
        }
      ],
      "keyInformation" : {
        "entityType" : "AC",
        "firmId" : "101",
        "entityId" : "363147",
        "environmentId" : "QA_FDX",
        "lastUpdated" : "2019-10-21T22:19:14.243Z"
      }

Hi,

you should be able to do this using transform (see https://www.elastic.co/guide/en/elasticsearch/reference/7.4/put-transform.html). The workflow would look like this:

  1. you index the data into elasticsearch as single docs, this is the "source" index
  2. you create a transform that pulls data from the "source" and aggregates it according to your needs into a "dest" index.

In your case you would group by entity Id, firm id, etc. and define aggregations, e.g. lastUpdated would be a max aggregation. Combining the docs is more tricky but doable using a scripted metric aggregation:

"associatedHouseholds": {
        "scripted_metric": {
          "init_script": "state.docs = []",
          "map_script": "state.docs.add(new HashMap(params['_source']))",
          "combine_script": "return state.docs",
          "reduce_script": "def ret = []; for (s in states) {for (d in s) { ret.add(d);}}return ret"
        }
      }

In order to do this as data comes in you need to sync source and dest via a timestamp field, it seems like lastUpdated is also the ingest timestamp and can therefore be used for this.

Note that you can instead of writing to an index write to a pipeline, this enables you to do further operations on the aggregated data.

I hope this helps!

2 Likes

That is interesting, my initials thoughts were to create an aggregated query as input for logstash, then create a csv file with the results to be imported into a new index. Questions:

  1. Can "Transform" also be used to enrich new fields or alter existing values (e.g. If value X then XYZ)?
  2. Can the timestamp field of new index use the values of a date field (not when the event was created)? In my case, the origin index has its timestamp values overridden via logstash (date filter).

Thank you.

Regarding your questions:

  1. Transform can indirectly enrich documents by writing the output into a ingest pipeline instead of an index, this requires minimum 7.4. See
    https://www.elastic.co/guide/en/elasticsearch/reference/7.4/put-transform.html#put-transform-request-body for how to specify a destination and
    https://www.elastic.co/guide/en/elasticsearch/reference/7.4/ingest.html regarding ingest.

  2. I am not sure if I fully understand the question, if not, please let me know. You are free to choose which date field you want to take over from the origin index to the dest index. That can be any date field. As you aggregate data you have to specify an aggregation. You can also aggregated earliest (min) and the latest (max) within the same transform.

When it comes to synchronizing origin and dest you are also free to choose the field you want to base synchronization on, however there is a catch: Synchronization queries the origin index for changes based on the current time minus the configured delay. Therefore the timestamp you choose must be a real timestamp and not an arbitrary date field. delay compensates for possible ingest delay, so the time it takes from pushing the data into the system until it's searchable in the origin index. A timestamp that is closer should be preferred, so I would choose rather the logstash timestamp than the timestamp of the feeder.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.