Elasticsearch Input and Aggregation Issue

Hello,

I'm trying to combine two indexes using an elasticsearch input plugin and it's not working as intended (aggregation is not occuring). I'm setting the pipeline workers to 1.

Event #1

{
  "_index": "lum-asset-ent-2020.04",
  "_type": "_doc",
  "_id": "19-00:50:56:a7:0d:fa",
  "_version": 36,
  "_score": 0,
  "_source": {
    "lumeta_vendor": "VMWare",
    "@version": "1",
    "@timestamp": "2020-04-23T18:14:26.466Z",
    "path": "/usr/share/logstash/bin/lumeta/All Devices_34.csv",
    "ip_address": "192.168.65.237",
    "host": "dac427b2b5d2",
    "Lumeta_scantypes": "[\"broadcastDiscovery\",\"hostDiscovery\",\"pathDiscovery\"]",
    "mac_address": "00:50:56:a7:0d:fa",
    "lumeta_identity": "null",
    "lumeta_AS Name": "null",
    "data_source": "lumeta",
    "Lumeta_Last Observed": "04/23/2020 10:44:19 AM",
    "lumeta_verson": "null",
    "lumeta_snmpresponder": "false",
    "lumeta_devicetype": "Server",
    "lumeta_snmpaccessible": "false",
    "Lumeta_Id": "19",
    "reporting period": "2020.04",
    "lumeta_model": "null",
    "lumeta_protocols": "[\"arp\",\"icmp\",\"udp\"]",
    "lumeta_First Observed": "04/20/2020 11:40:50 AM",
    "message": "\"19\",\"192.168.65.237\",\"00:50:56:a7:0d:fa\",\"null\",\"null\",\"null\",\"true\",\"null\",\"Server\",\"04/20/2020 11:40:50 AM\",\"04/23/2020 10:44:19 AM\",\"null\",\"null\",\"false\",\"false\",\"[\"\"broadcastDiscovery\"\",\"\"hostDiscovery\"\",\"\"pathDiscovery\"\"]\",\"VMWare\",\"[\"\"arp\"\",\"\"icmp\"\",\"\"udp\"\"]\"",
    "type": "asset",
    "host_name": "null",
    "lumeta_os": "null",
    "lumeta_active": "true"
  },
  "fields": {
    "@timestamp": [
      "2020-04-23T18:14:26.466Z"
    ]
  },

Event #2

{
  "_index": "nxp-asset-ent-lin-svr-2020.04",
  "_type": "_doc",
  "_id": "XXXXXXXXX-1168",
  "_version": 4,
  "_score": 0,
  "_source": {
    "Nexpose": [
      {
        "Tag": "EDC XXXXX"
      },
      {
        "Tag": "ZONE:OPERATION"
      },
      {
        "Tag": "PARTNERXXXXX"
      },
      {
        "Tag": "Service Line: ENT-LINUX"
      }
    ],
    "reporting period": "2020.04",
    "os_system": "Linux",
    "host_name": null,
    "assessed_for_vulnerabilities": true,
    "assessed_for_policies": true,
    "type": "asset",
    "credential_status": "All credentials failed",
    "risk_modifier": 1,
    "os_cpe": "cpe:/o:linux:linux_kernel:3.12",
    "os_name": "Linux",
    "mac_address": "00:50:56:a7:0d:fa",
    "data_source": "Nexpose",
    "os_version": "3.12",
    "ip_address": "192.168.65.237",
    "os_family": "Linux",
    "os_type": "General",
    "host_type": null,
    "last_assessed_for_vulnerabilities": "2020-04-09T14:48:05.276Z",
    "os_architecture": null,
    "@version": "1",
    "os_vendor": "Linux",
    "os_description": "Linux 3.12",
    "@timestamp": "2020-04-23T18:23:52.419Z",
    "asset_id": 1168
  },
  "fields": {
    "last_assessed_for_vulnerabilities": [
      "2020-04-09T14:48:05.276Z"
    ],
    "@timestamp": [
      "2020-04-23T18:23:52.419Z"
    ]
  },

What I'm trying to accomplish: (I only need the data_source and the ip_address tags in this combined event).

 "data_source": [
      {
        "source": "Nexpose"
         "source": "lumeta"
      }
    ],
    "ip_address": "192.168.65.237",

Two indexes contain like for like fields (ip_address and data_source), so I assumed the aggregation would be rather simple but despite my configuration, the events are not being aggregation and I'm received two events per IP. I'm at a loss as to why it doesn't work - perhaps I'm missing how the events are streamed using the elasticsearch input plugin.

Config:

input {
  elasticsearch {
    hosts => "192.168.65.240:9200"
    index => "nxp-asset-ent-*,lum-asset-*"
    query => '{
      "_source": ["ip_address", "data_source"],
      "query": {
        "bool": {
          "must": {
            "term": { "type" : "asset" }
          },
          "filter": {
            "range": {
            "@timestamp": { "gte": "now-30d/d", "lt": "now/d" }
            }
          }
        }
      }
    }'
  }
}

filter {
    aggregate {
    task_id => "%{ip_address}"
    code =>
    "
      map['ip_address'] = event.get('ip_address')

      map['data_source'] ||= []
      map['data_source'] << {'source' => event.get('data_source')}

      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 3
    }
}


output {
   elasticsearch {
   hosts => ["192.168.65.240:9200"]
   index => "nxp_lumeta"
   }
#  stdout { codec => rubydebug }
}

What I'm actually getting.....

{
  "_index": "nxp_lumeta",
  "_type": "_doc",
  "_id": "6bYqrHEBL_UTiu-bBwEO",
  "_version": 1,
  "_score": 0,
  "_source": {
    "@timestamp": "2020-04-24T12:29:27.431Z",
    "data_source": [
      {
        "source": "Nexpose"
      }
    ],
    "ip_address": "192.168.65.237",
    "@version": "1"
  },
  "fields": {
    "@timestamp": [
      "2020-04-24T12:29:27.431Z"
    ]
  },

{
  "_index": "nxp_lumeta",
  "_type": "_doc",
  "_id": "nrYqrHEBL_UTiu-bBwL2",
  "_version": 1,
  "_score": 0,
  "_source": {
    "@timestamp": "2020-04-24T12:29:27.889Z",
    "data_source": [
      {
        "source": "lumeta"
      }
    ],
    "ip_address": "192.168.65.237",
    "@version": "1"
  },
  "fields": {
    "@timestamp": [
      "2020-04-24T12:29:27.889Z"
    ]
  },

Thanks for any help that can be offered!

That will only work if the events are sorted by IP address, and I do not see a sort in your query. Note that you have to disable java_execution to maintain document order in 7.0 through 7.6.x. push_map_as_event_on_timeout might work better.

Thank you Badger. Issue solved.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.