I have 14 million records arriving every 5 minutes from Filebeat into a data stream (NetFlow data). My goal is to aggregate this data based on certain rules, such as summing source bytes and destination bytes for specific protocols. Here's the challenge:
- Grouping: Data is grouped using a key (
community.id
) within 5-minute intervals. - Missing Data Issue:
Thedestination.bytes
field in the source data is always empty.
I need to derivedestination.bytes
anddestination.packets
based on IP flow (e.g., bidirectional, unidirectional flows). - Volume and Velocity:
Handling this massive volume and velocity of data requires efficient transformation logic. - Elastic Feasibility:**
Can Elasticsearch handle this scale effectively?
How should I configure the transformation pipeline to achieve this aggregation reliably?
Im running elastic on Azure , with 3 node setup
Transformation Configuration:
Below is the current transformation definition in Elasticsearch. It attempts to derive destination data (bytes and packets) from reverse flows, but I need advice on whether this approach is optimal and achievable for my use case.
`PUT _transform/odp
{
"source": {
"index": "logs-netflow.log*",
"query": {
"bool": {
"must": [
{
"range": {
"event.start": {
"gte": "now-15m",
"lt": "now"
}
}
},
{
"terms": {
"network.iana_number": [17, "17"]
}
}
]
}
}
},
"dest": {
"index": "odp_prep_hscn_netflow_simple_idx"
},
"frequency": "5m",
"sync": {
"time": {
"field": "event.start",
"delay": "1m"
}
},
"pivot": {
"group_by": {
"community_id": {
"terms": {
"field": "network.community_id"
}
},
"interval": {
"date_histogram": {
"field": "event.start",
"fixed_interval": "5m"
}
},
"source_ip": {
"terms": {
"field": "source.ip"
}
},
"destination_ip": {
"terms": {
"field": "destination.ip"
}
}
},
"aggregations": {
"source_bytes": {
"sum": {
"field": "source.bytes"
}
},
"destination_bytes": {
"bucket_script": {
"buckets_path": {
"reverse_source_bytes": "source_bytes"
},
"script": """
// Sum the source.bytes where destination.ip matches source.ip in reverse flows
return params.reverse_source_bytes != null ? params.reverse_source_bytes : 0;
"""
}
},
"source_packets": {
"sum": {
"field": "source.packets"
}
},
"destination_packets": {
"bucket_script": {
"buckets_path": {
"reverse_source_packets": "source_packets"
},
"script": """
// Sum the source.packets where destination.ip matches source.ip in reverse flows
return params.reverse_source_packets != null ? params.reverse_source_packets : 0;
"""
}
},
"sorted_event": {
"top_metrics": {
"metrics": [
{ "field": "event.created", "missing": null },
{ "field": "event.duration", "missing": 0 },
{ "field": "event.ingested", "missing": null },
{ "field": "network.direction", "missing": "unknown" },
{ "field": "agent.name", "missing": "unknown" },
{ "field": "input.type", "missing": "netflow" },
{ "field": "network.transport", "missing": "udp" },
{ "field": "network.iana_number", "missing": 17 },
{ "field": "netflow.ip_class_of_service", "missing": -1 },
{ "field": "netflow.ip_precedence", "missing": -1 },
{ "field": "netflow.exporter.address", "missing": "unknown" },
{ "field": "netflow.ingress_interface", "missing": -1 },
{ "field": "netflow.egress_interface", "missing": -1 },
{ "field": "netflow.icmp_type_ipv4", "missing": -1 },
{ "field": "netflow.icmp_code_ipv4", "missing": -1 },
{ "field": "tcp_control_bits", "missing": 0 },
{ "field": "source.locality", "missing": "internal" },
{ "field": "destination.locality", "missing": "internal" },
{ "field": "source.nat.ip", "missing": "0.0.0.0" },
{ "field": "destination.nat.ip", "missing": "0.0.0.0" },
{ "field": "source.nat.port", "missing": 0 },
{ "field": "destination.nat.port", "missing": 0 },
{ "field": "observer.ip", "missing": "0.0.0.0" },
{ "field": "source.ip", "missing": "0.0.0.0" },
{ "field": "destination.ip", "missing": "0.0.0.0" },
{ "field": "source.port", "missing": null },
{ "field": "destination.port", "missing": null },
{ "field": "tags" },
{ "field": "response_count", "missing": 0 },
{ "field": "source.bytes", "missing": 0 },
{ "field": "destination.bytes", "missing": 0 },
{ "field": "source.packets", "missing": 0 },
{ "field": "destination.packets", "missing": 0 },
{ "field": "netflow.exporter.ip", "missing": null },
{ "field": "netflow.exporter.port", "missing": 0 },
{ "field": "request_count", "missing": 0 },
{ "field": "event.start", "missing": null },
{ "field": "event.end", "missing": null }
],
"sort": [
{
"event.start": {
"order": "desc"
}
}
],
"size": 1
}
}
}
},
"settings": {
"docs_per_second": 500000,
"max_page_search_size": 65535
}
}`
My source data looks something like
network.community_id | event.start | event.end | source.ip | destination.ip | source.bytes | Destination bytes |
---|---|---|---|---|---|---|
1:+2EY1YHQN0Vymfu0X5xzHh1g8g0= | Nov 29, 2024 14:00:02.511 | Nov 29, 2024 14:00:05.511 | 1.1.1.1 | 2.2.2.2 | 209 | Empty |
1:+2EY1YHQN0Vymfu0X5xzHh1g8g0= | Nov 29, 2024 14:00:01.001 | Nov 29, 2024 14:00:02.001 | 2.2.2.2 | 1.1.1.1 | 92 | Empty |
1:+2EY1YHQN0Vymfu0X5xzHh1g8g0= | Nov 29, 2024 13:59:59.667 | Nov 29, 2024 13:60:59.671 | 1.1.1.1 | 2.2.2.2 | 209 | Empty |
1:+2EY1YHQN0Vymfu0X5xzHh1g8g0= | Nov 29, 2024 13:59:59.65 | Nov 29, 2024 13:59:59.656 | 2.2.2.2 | 1.1.1.1 | 92 | Empty |