Logstash Filter - creating list from multiple events

I am working on below schema, grouping multiple households based on the keyinformation. We could achieve this with the logstash aggregate filter. But this is having performance issue and also cannot run with multiple threads/pipelines. Is there a way to do this with out aggregate filter.
here is the schema,

POST /test/_mapping
{
"dynamic": "strict",
"properties" : {
"id" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 100}}},
"keyInformation" : {
"properties" : {
"entityId" : {"type" : "long"},
"entityType" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 3}}},
"environmentId" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 50}}},
"firmId" : {"type" : "long"},
"key" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 100}}},
"lastUpdated" : {"type" : "date"}
}
},
"associatedHouseholds" : {
"type": "nested",
"properties" : {
"householdId" : {"type" : "long"},
"type" : {"type" : "long"},
"nickname" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 100}}},
"title" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 100}}},
"acctStartDate": {"type" : "date"},
"acctStopDate": {"type" : "date"},
"lastUpdated" : {"type" : "date"}
}
}
}
}

sample data:

ENVIRONMENT_ID|ENTITY_ID|ENTITY_TYPE|SPN_FIRM_ID|householdId|TYPE|TITLE|NICKNAME|ACCTSTARTDATE|ACCTSTOPDATE
QA_FDX|363147|AC|101|146180|1|HH TITLE 146180|HH NICKNAME 146180|2019-01-01|2019-12-01
QA_FDX|363147|AC|101|146181|1|HH TITLE 146181|HH NICKNAME 146181|2019-01-01|2020-12-01
QA_FDX|363149|AC|101|146181|1|HH TITLE 146181|HH NICKNAME 146181|2019-01-01|2019-12-01
QA_FDX|363149|AC|101|146182|1|HH TITLE 146182|HH NICKNAME 146182|2019-01-01|2029-12-01
QA_FDX|363150|AC|101|146180|1|HH TITLE 146180|HH NICKNAME 146180|2019-01-01|2039-12-01
QA_FDX|363152|AC|101|146180|1|HH TITLE 146180|HH NICKNAME 146180|2019-01-01|2049-12-01
QA_FDX|363153|AC|101|146180|1|HH TITLE 146180|HH NICKNAME 146180|2019-01-01|2059-12-01
QA_FDX|363147|AC|101|146182|1|HH TITLE 146182|HH NICKNAME 146182|2019-01-01|2019-12-01
QA_FDX|363147|AC|101|146183|1|HH TITLE 146183|HH NICKNAME 146183|2019-01-01|2020-12-01
QA_FDX|363149|AC|101|146183|1|HH TITLE 146183|HH NICKNAME 146183|2019-01-01|2019-12-01
QA_FDX|363149|AC|101|146184|1|HH TITLE 146184|HH NICKNAME 146184|2019-01-01|2029-12-01
QA_FDX|363150|AC|101|146181|1|HH TITLE 146181|HH NICKNAME 146181|2019-01-01|2039-12-01
QA_FDX|363152|AC|101|146181|1|HH TITLE 146181|HH NICKNAME 146181|2019-01-01|2049-12-01
QA_FDX|363153|AC|101|146181|1|HH TITLE 146181|HH NICKNAME 146181|2019-01-01|2059-12-01

expected logstash output:

      "associatedHouseholds" : [
        {
          "acctStopDate" : "2019-12-01",
          "nickname" : "HH NICKNAME 146180",
          "type" : "1",
          "householdId" : "146180",
          "acctStartDate" : "2019-01-01",
          "title" : "HH TITLE 146180"
        },
        {
          "acctStopDate" : "2020-12-01",
          "nickname" : "HH NICKNAME 146181",
          "type" : "1",
          "householdId" : "146181",
          "acctStartDate" : "2019-01-01",
          "title" : "HH TITLE 146181"
        },
        {
          "acctStopDate" : "2019-12-01",
          "nickname" : "HH NICKNAME 146182",
          "type" : "1",
          "householdId" : "146182",
          "acctStartDate" : "2019-01-01",
          "title" : "HH TITLE 146182"
        },
        {
          "acctStopDate" : "2020-12-01",
          "nickname" : "HH NICKNAME 146183",
          "type" : "1",
          "householdId" : "146183",
          "acctStartDate" : "2019-01-01",
          "title" : "HH TITLE 146183"
        }
      ],
      "keyInformation" : {
        "entityType" : "AC",
        "firmId" : "101",
        "entityId" : "363147",
        "environmentId" : "QA_FDX",
        "lastUpdated" : "2019-10-21T22:19:14.243Z"
      }

It might be possible to use an elasticsearch filter to get a list of household ids, then use an http filter to run elasticsearch queries to get the data for each id. It might not improve performance much though.

I don't know if you can query elasticsearch and get it to aggregate the data the way you want.

How about we don't do aggregation in logstash just send the individual records to elastic, is it possible that elastic can aggregate these based on it's schema ?

As I said, I do not know if elasticsearch can do that. Try asking a new question in the elasticsearch forum.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.