Hello,
I'm trying to combine two indexes using an elasticsearch input plugin and it's not working as intended (aggregation is not occuring). I'm setting the pipeline workers to 1.
Event #1
{
"_index": "lum-asset-ent-2020.04",
"_type": "_doc",
"_id": "19-00:50:56:a7:0d:fa",
"_version": 36,
"_score": 0,
"_source": {
"lumeta_vendor": "VMWare",
"@version": "1",
"@timestamp": "2020-04-23T18:14:26.466Z",
"path": "/usr/share/logstash/bin/lumeta/All Devices_34.csv",
"ip_address": "192.168.65.237",
"host": "dac427b2b5d2",
"Lumeta_scantypes": "[\"broadcastDiscovery\",\"hostDiscovery\",\"pathDiscovery\"]",
"mac_address": "00:50:56:a7:0d:fa",
"lumeta_identity": "null",
"lumeta_AS Name": "null",
"data_source": "lumeta",
"Lumeta_Last Observed": "04/23/2020 10:44:19 AM",
"lumeta_verson": "null",
"lumeta_snmpresponder": "false",
"lumeta_devicetype": "Server",
"lumeta_snmpaccessible": "false",
"Lumeta_Id": "19",
"reporting period": "2020.04",
"lumeta_model": "null",
"lumeta_protocols": "[\"arp\",\"icmp\",\"udp\"]",
"lumeta_First Observed": "04/20/2020 11:40:50 AM",
"message": "\"19\",\"192.168.65.237\",\"00:50:56:a7:0d:fa\",\"null\",\"null\",\"null\",\"true\",\"null\",\"Server\",\"04/20/2020 11:40:50 AM\",\"04/23/2020 10:44:19 AM\",\"null\",\"null\",\"false\",\"false\",\"[\"\"broadcastDiscovery\"\",\"\"hostDiscovery\"\",\"\"pathDiscovery\"\"]\",\"VMWare\",\"[\"\"arp\"\",\"\"icmp\"\",\"\"udp\"\"]\"",
"type": "asset",
"host_name": "null",
"lumeta_os": "null",
"lumeta_active": "true"
},
"fields": {
"@timestamp": [
"2020-04-23T18:14:26.466Z"
]
},
Event #2
{
"_index": "nxp-asset-ent-lin-svr-2020.04",
"_type": "_doc",
"_id": "XXXXXXXXX-1168",
"_version": 4,
"_score": 0,
"_source": {
"Nexpose": [
{
"Tag": "EDC XXXXX"
},
{
"Tag": "ZONE:OPERATION"
},
{
"Tag": "PARTNERXXXXX"
},
{
"Tag": "Service Line: ENT-LINUX"
}
],
"reporting period": "2020.04",
"os_system": "Linux",
"host_name": null,
"assessed_for_vulnerabilities": true,
"assessed_for_policies": true,
"type": "asset",
"credential_status": "All credentials failed",
"risk_modifier": 1,
"os_cpe": "cpe:/o:linux:linux_kernel:3.12",
"os_name": "Linux",
"mac_address": "00:50:56:a7:0d:fa",
"data_source": "Nexpose",
"os_version": "3.12",
"ip_address": "192.168.65.237",
"os_family": "Linux",
"os_type": "General",
"host_type": null,
"last_assessed_for_vulnerabilities": "2020-04-09T14:48:05.276Z",
"os_architecture": null,
"@version": "1",
"os_vendor": "Linux",
"os_description": "Linux 3.12",
"@timestamp": "2020-04-23T18:23:52.419Z",
"asset_id": 1168
},
"fields": {
"last_assessed_for_vulnerabilities": [
"2020-04-09T14:48:05.276Z"
],
"@timestamp": [
"2020-04-23T18:23:52.419Z"
]
},
What I'm trying to accomplish: (I only need the data_source and the ip_address tags in this combined event).
"data_source": [
{
"source": "Nexpose"
"source": "lumeta"
}
],
"ip_address": "192.168.65.237",
Two indexes contain like for like fields (ip_address and data_source), so I assumed the aggregation would be rather simple but despite my configuration, the events are not being aggregation and I'm received two events per IP. I'm at a loss as to why it doesn't work - perhaps I'm missing how the events are streamed using the elasticsearch input plugin.
Config:
input {
elasticsearch {
hosts => "192.168.65.240:9200"
index => "nxp-asset-ent-*,lum-asset-*"
query => '{
"_source": ["ip_address", "data_source"],
"query": {
"bool": {
"must": {
"term": { "type" : "asset" }
},
"filter": {
"range": {
"@timestamp": { "gte": "now-30d/d", "lt": "now/d" }
}
}
}
}
}'
}
}
filter {
aggregate {
task_id => "%{ip_address}"
code =>
"
map['ip_address'] = event.get('ip_address')
map['data_source'] ||= []
map['data_source'] << {'source' => event.get('data_source')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}
output {
elasticsearch {
hosts => ["192.168.65.240:9200"]
index => "nxp_lumeta"
}
# stdout { codec => rubydebug }
}
What I'm actually getting.....
{
"_index": "nxp_lumeta",
"_type": "_doc",
"_id": "6bYqrHEBL_UTiu-bBwEO",
"_version": 1,
"_score": 0,
"_source": {
"@timestamp": "2020-04-24T12:29:27.431Z",
"data_source": [
{
"source": "Nexpose"
}
],
"ip_address": "192.168.65.237",
"@version": "1"
},
"fields": {
"@timestamp": [
"2020-04-24T12:29:27.431Z"
]
},
{
"_index": "nxp_lumeta",
"_type": "_doc",
"_id": "nrYqrHEBL_UTiu-bBwL2",
"_version": 1,
"_score": 0,
"_source": {
"@timestamp": "2020-04-24T12:29:27.889Z",
"data_source": [
{
"source": "lumeta"
}
],
"ip_address": "192.168.65.237",
"@version": "1"
},
"fields": {
"@timestamp": [
"2020-04-24T12:29:27.889Z"
]
},
Thanks for any help that can be offered!