Transforming (aggregating) network logs 14 million records every 5 min

I have 14 million records arriving every 5 minutes from Filebeat into a data stream (NetFlow data). My goal is to aggregate this data based on certain rules, such as summing source bytes and destination bytes for specific protocols. Here's the challenge:

  1. Grouping: Data is grouped using a key (community.id) within 5-minute intervals.
  2. Missing Data Issue:
    The destination.bytes field in the source data is always empty.
    I need to derive destination.bytes and destination.packets based on IP flow (e.g., bidirectional, unidirectional flows).
  3. Volume and Velocity:
    Handling this massive volume and velocity of data requires efficient transformation logic.
  4. Elastic Feasibility:**
    Can Elasticsearch handle this scale effectively?
    How should I configure the transformation pipeline to achieve this aggregation reliably?

Im running elastic on Azure , with 3 node setup

Transformation Configuration:

Below is the current transformation definition in Elasticsearch. It attempts to derive destination data (bytes and packets) from reverse flows, but I need advice on whether this approach is optimal and achievable for my use case.

`PUT _transform/odp
{
  "source": {
    "index": "logs-netflow.log*",
    "query": {
      "bool": {
        "must": [
          {
            "range": {
              "event.start": {
                "gte": "now-15m",
                "lt": "now"
              }
            }
          },
          {
            "terms": {
              "network.iana_number": [17, "17"]
            }
          }
        ]
      }
    }
  },
  "dest": {
    "index": "odp_prep_hscn_netflow_simple_idx"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "event.start",
      "delay": "1m"
    }
  },
  "pivot": {
    "group_by": {
      "community_id": {
        "terms": {
          "field": "network.community_id"
        }
      },
      "interval": {
        "date_histogram": {
          "field": "event.start",
          "fixed_interval": "5m"
        }
      },
      "source_ip": {
        "terms": {
          "field": "source.ip"
        }
      },
      "destination_ip": {
        "terms": {
          "field": "destination.ip"
        }
      }
    },
    "aggregations": {
      "source_bytes": {
        "sum": {
          "field": "source.bytes"
        }
      },
      "destination_bytes": {
        "bucket_script": {
          "buckets_path": {
            "reverse_source_bytes": "source_bytes"
          },
          "script": """
            // Sum the source.bytes where destination.ip matches source.ip in reverse flows
            return params.reverse_source_bytes != null ? params.reverse_source_bytes : 0;
          """
        }
      },
      "source_packets": {
        "sum": {
          "field": "source.packets"
        }
      },
      "destination_packets": {
        "bucket_script": {
          "buckets_path": {
            "reverse_source_packets": "source_packets"
          },
          "script": """
            // Sum the source.packets where destination.ip matches source.ip in reverse flows
            return params.reverse_source_packets != null ? params.reverse_source_packets : 0;
          """
        }
      },
      "sorted_event": {
        "top_metrics": {
          "metrics": [
            { "field": "event.created", "missing": null },
            { "field": "event.duration", "missing": 0 },
            { "field": "event.ingested", "missing": null },
            { "field": "network.direction", "missing": "unknown" },
            { "field": "agent.name", "missing": "unknown" },
            { "field": "input.type", "missing": "netflow" },
            { "field": "network.transport", "missing": "udp" },
            { "field": "network.iana_number", "missing": 17 },
            { "field": "netflow.ip_class_of_service", "missing": -1 },
            { "field": "netflow.ip_precedence", "missing": -1 },
            { "field": "netflow.exporter.address", "missing": "unknown" },
            { "field": "netflow.ingress_interface", "missing": -1 },
            { "field": "netflow.egress_interface", "missing": -1 },
            { "field": "netflow.icmp_type_ipv4", "missing": -1 },
            { "field": "netflow.icmp_code_ipv4", "missing": -1 },
            { "field": "tcp_control_bits", "missing": 0 },
            { "field": "source.locality", "missing": "internal" },
            { "field": "destination.locality", "missing": "internal" },
            { "field": "source.nat.ip", "missing": "0.0.0.0" },
            { "field": "destination.nat.ip", "missing": "0.0.0.0" },
            { "field": "source.nat.port", "missing": 0 },
            { "field": "destination.nat.port", "missing": 0 },
            { "field": "observer.ip", "missing": "0.0.0.0" },
            { "field": "source.ip", "missing": "0.0.0.0" },
            { "field": "destination.ip", "missing": "0.0.0.0" },
            { "field": "source.port", "missing": null },
            { "field": "destination.port", "missing": null },
            { "field": "tags" },
            { "field": "response_count", "missing": 0 },
            { "field": "source.bytes", "missing": 0 },
            { "field": "destination.bytes", "missing": 0 },
            { "field": "source.packets", "missing": 0 },
            { "field": "destination.packets", "missing": 0 },
            { "field": "netflow.exporter.ip", "missing": null },
            { "field": "netflow.exporter.port", "missing": 0 },
            { "field": "request_count", "missing": 0 },
            { "field": "event.start", "missing": null },
            { "field": "event.end", "missing": null }
          ],
          "sort": [
            {
              "event.start": {
                "order": "desc"
              }
            }
          ],
          "size": 1
        }
      }
    }
  },
  "settings": {
    "docs_per_second": 500000,
    "max_page_search_size": 65535
  }
}`

My source data looks something like

network.community_id event.start event.end source.ip destination.ip source.bytes Destination bytes
1:+2EY1YHQN0Vymfu0X5xzHh1g8g0= Nov 29, 2024 14:00:02.511 Nov 29, 2024 14:00:05.511 1.1.1.1 2.2.2.2 209 Empty
1:+2EY1YHQN0Vymfu0X5xzHh1g8g0= Nov 29, 2024 14:00:01.001 Nov 29, 2024 14:00:02.001 2.2.2.2 1.1.1.1 92 Empty
1:+2EY1YHQN0Vymfu0X5xzHh1g8g0= Nov 29, 2024 13:59:59.667 Nov 29, 2024 13:60:59.671 1.1.1.1 2.2.2.2 209 Empty
1:+2EY1YHQN0Vymfu0X5xzHh1g8g0= Nov 29, 2024 13:59:59.65 Nov 29, 2024 13:59:59.656 2.2.2.2 1.1.1.1 92 Empty

Not sure if it helps, but when you say you are streaming reflow data, and netflow data is pretty common, there's a chance you are trying to solve a problem already solved someplace else.

e.g. have you looked at logstash/filebeat netflow modules, maybe can help?

Firstly thank you for responding to my query,
i already have coming thru via file beats

my core issue is aggregating 14 million records in 5 min via elastic transformers , I'm intending to calculate total sum of source bytes and destination bytes over 5 min interval but due to destination byte column having no data I'm having to do complex calculation to determine destination byte via source byte column due to which transformers are taking a lot of time to process and aggregate data

im trying to seek some guidance on how on how do i structure the code architecture based on this high volume and velocity of data.
Is it even possible to aggregate such a volume of data in elastic ? please can you shed some light on these issues or provide some guidance

OK, to answer points separately.

14 million docs per 5 minutes does not sound a massive volume. That's processing less than 5k docs per second. I've managed (on-prem and cloud) systems with 5x that per-second volume at peak, did tons of complex aggregations on that volume in near real-time. And had I needed more, I could have scaled horizontally. So yes, the volume is manageable by an appropriate Elastic cluster - aggregating millions of docs quickly is standard stuff.

"The destination.bytes field in the source data is always empty"

Really? Why? I'm still wondering why your netflow data is sort of incomplete and you are having to do these "complex calculations".

Anyways, someone else can likely better advise on the ideal elastic transformer way, your "script" seems to me doable in a couple of lines in awk, perl, or python.

A long time ago (kibana 5.x vintage) I did something not dissimilar with the elasticdump utility and an awk script effectively acting as a filter - read data from one index, munge as required, write to another index. Ugly and difficult to monitor/maintain, but worked for a couple of years. Might still be working for all I know :slight_smile:

A poor man's ETL tool.

thanks @RainTown ( Kevin) for confirming that given the volume and velocity of the data 14m records in 5 min is doable .

its nice to know that we could use awk scripts , but here im trying to process data and aggregate time series data in real time.

how to do this the elastic transform way is now a the question having said that my transformer with basic code as above are processing data at just
Records per second: 12167 documents / 43.8 seconds = ~278 documents/second
search_time_in_ms (43488 ms) is significantly higher than processing_time_in_ms (393 ms),

im using below nodes config , im just hoping someone can help me pointing to the solution.

id                     ip            heap.percent     rc   rm ram.percent cpu load_1m load_5m load_15m node.role master name                jdk      qcm      du       d    dt   dup
e7piAK0XTLW654JHA8qR2A 10.46.232.13            27  7.1gb  8gb          89  51    2.53    2.86     2.88 rw        -      instance-0000000176 23    30.3mb  44.9gb   1.5tb 1.5tb  2.81
RFHv07eDQZu1TgN-vvVj0Q 10.46.233.0             75  7.9gb  8gb         100  98    2.41    2.34     2.27 rw        -      instance-0000000137 23      43mb    88gb   1.4tb 1.5tb  5.50
8kxxGNeNSue1vZs5WUoNTw 10.46.233.1             69  7.9gb  8gb         100  22    1.28    1.18     1.19 rw        -      instance-0000000136 23    34.3mb    77gb   1.4tb 1.5tb  4.82
a0WMPWALRKSChEWzwBfj0Q 10.46.232.122           41  1.8gb  2gb          93   4    1.28    1.21     1.29 mr        *      instance-0000000191 23        0b  62.6mb   3.9gb   4gb  1.53
L6IhUISsTRqUGXJPcpp47Q 10.46.232.140           54  1.6gb  2gb          82   0    3.79    3.36     3.50 mr        -      instance-0000000184 23        0b  54.7mb   3.9gb   4gb  1.34
kpWzvfwDSeG53zkkOfl-uA 10.46.232.248           37  1.6gb  2gb          80   0    3.31    2.72     2.67 mr        -      instance-0000000193 23        0b  22.5mb   3.9gb   4gb  0.55
Zf68tAsTT4iBrEiQNAmqRw 10.46.232.248           25  4.5gb  8gb          57  19    3.31    2.72     2.67 lr        -      instance-0000000190 23        0b 150.8mb  15.8gb  16gb  0.92
ac7qKV6uRTONt4-FIsTjlg 10.46.232.140           63    4gb  8gb          51   0    3.79    3.36     3.50 lr        -      instance-0000000197 23        0b  11.5mb  15.9gb  16gb  0.07
7vwLsSAuSqGOIRsdqLMfMA 10.46.232.234           66 33.7gb 60gb          56   1    0.18    0.29     1.13 ir        -      instance-0000000194 23        0b 103.9mb 119.8gb 120gb  0.08
zsZhd_sISuC-y42o3ljBbg 10.46.232.235           17 33.1gb 60gb          55   0    0.53    0.43     1.25 ir        -      instance-0000000199 23        0b   106mb 119.8gb 120gb  0.09
urM1QCJySu-bnq3W10uS6w 10.46.232.237           59 33.6gb 60gb          56   0    0.07    0.35     1.16 ir        -      instance-0000000198 23        0b 105.6mb 119.8gb 120gb  0.09
pxLNtkeBSF240-EfKky8Ew 10.46.232.216           33 33.6gb 60gb          56   0    0.26    0.42     1.21 ir        -      instance-0000000195 23        0b 102.4mb 119.8gb 120gb  0.08
4ocSbJEGRq2RHPZAtHf7oQ 10.46.232.243           50   54gb 60gb          90  35    7.62    7.52     9.21 hrst      -      instance-0000000173 23   615.8mb 600.4gb   1.3tb 1.9tb 29.90
4T0YOZ3rR1uOmMMBMWKHnA 10.46.232.175           50 55.5gb 60gb          93  21    6.39    6.14    12.24 hrst      -      instance-0000000166 23   709.2mb 616.4gb   1.3tb 1.9tb 30.70
HOz-iH8MQgiqnYZT4Mb9Jg 10.46.232.244           21 54.1gb 60gb          90  36    5.72    6.76     9.03 hrst      -      instance-0000000182 23     616mb   618gb   1.3tb 1.9tb 30.78
JY_qW2HTSgKT9QFI86zn_w 10.46.232.200           31 54.3gb 60gb          91  37    7.70    8.15     9.94 hrst      -      instance-0000000167 23   579.4mb 626.8gb   1.3tb 1.9tb 31.22
AthaBOFUQ5O5rwEw8f8jfg 10.46.232.183           71 53.7gb 60gb          90  32    6.60    7.81     9.41 hrst      -      instance-0000000178 23   616.2mb 610.8gb   1.3tb 1.9tb 30.42
Iew0o2tYRvuCTplnQDs4wg 10.46.232.239           61 54.4gb 60gb          91  24    7.26    7.27     8.90 hrst      -      instance-0000000177 23   652.8mb 617.7gb   1.3tb 1.9tb 30.76
WojWSBugQbCByNrUJRzepg 10.46.232.233           32 54.7gb 60gb          91  36    6.47    7.22     9.73 hrst      -      instance-0000000181 23     1.2gb 620.4gb   1.3tb 1.9tb 30.90
L4ZY2eUGTAm5H-ZswFfAww 10.46.232.187           55 54.3gb 60gb          91  38    7.51    7.88     9.22 hrst      -      instance-0000000180 23   628.6mb 608.9gb   1.3tb 1.9tb 30.32
HGd5irf8TWmWKHrROIBH5A 10.46.232.238           46 54.6gb 60gb          91  32    5.16    6.91    10.32 hrst      -      instance-0000000179 23  1015.2mb 623.5gb   1.3tb 1.9tb 31.05

FYI my script, which was in fairness for a much simpler problem, think it was eventually a python script, on hardware approx 6 years ago, was a lot faster than whatever you are doing, and answer were available in almost real-time. There is value in simplicity - KISS.

Your cluster is decent size, but it's surely doing a lot more than JUST what you are describing here. For a start there is presumably the indexing into logs-netflow.log* of the data you are trying to later aggregate, right? And I guess others are doing other stuff with the data inside the cluster there too?

I dont know how optimal scripting inside aggregations is going to be, there might be some tweaks other can suggest. There is suggestions in the documentation: Working with transforms at scale | Elasticsearch Guide [8.16] | Elastic

By the way, I'm never used the scripting stuff in elasticsearch, but this example in the docs:

doesn't actually look too dissimilar to my understanding of what you are trying to do, which it does by way of nested aggregations.

Yes. Absolutely but "It depends" on many things... as @RainTown Said this cluster is certainly do other work.

I would first start with a simple transformation and build it up a piece at a time.

You have do the bucket scripting but that is an area where it is easy to write poor code... like not using parameters etc...

Also, if you need to derive data, it might be better to do it at ingest time if possible...

Also if NetFlow is key for your use case I would perhaps suggest looking at our partner https://www.elastiflow.com/ which specializes in NetFlow

cc @rcowart

2 Likes

by the way, does that code really do what you say it does? Comment says it sums a field, but I don't see a "sum" or a "+" operation?

Hello @RainTown

In my code, I'm using the following block to calculate the sum of source.bytes. Since the destination.bytes column is not populated, I’ve attached source.bytes to destination.ip to derive the destination.bytes and calculate their sum.

"aggregations": {
      "source_bytes": {
        "sum": {
          "field": "source.bytes"
        }
      },
      "destination_bytes": {
        "bucket_script": {
          "buckets_path": {
            "reverse_source_bytes": "source_bytes"
          },
          "script": """
            // Sum the source.bytes where destination.ip matches source.ip in reverse flows
            return params.reverse_source_bytes != null ? params.reverse_source_bytes : 0;
          """
        }
      },
      "source_packets": {
        "sum": {
          "field": "source.packets"
        }
      },
      "destination_packets": {
        "bucket_script": {
          "buckets_path": {
            "reverse_source_packets": "source_packets"
          },
          "script": """
            // Sum the source.packets where destination.ip matches source.ip in reverse flows
            return params.reverse_source_packets != null ? params.reverse_source_packets : 0;
          """
        }
      },

also i do get output like below which is having the

  "community_id": "1:2Jb/wNADsvbB2wesmzWsJODJY+Y=",
      "source_packets": 1,
      "destination_ip": "10.0.0.0.0",
      "source_bytes": 110,
      "interval": "2024-12-03T20:00:00.000Z",
      "sorted_event": {
        "event.created": "2024-12-03T20:09:26.618Z",
        "event.duration": "0",
        "event.ingested": "2024-12-03T20:09:27Z",
        "network.direction": "inbound",
        "agent.name": "odp-prep-netflow-elasticagent-ingest-a",
        "input.type": "netflow",
        "network.transport": "udp",
        "network.iana_number": "17",
        "netflow.ip_class_of_service": "0",
        "netflow.ip_precedence": "-1",
        "netflow.exporter.address": "10.1.1.1.1.:2000",
        "netflow.ingress_interface": "500011517",
        "netflow.egress_interface": "500021516",
        "netflow.icmp_type_ipv4": "-1",
        "netflow.icmp_code_ipv4": "-1",
        "tcp_control_bits": "0",
        "source.locality": "external",
        "destination.locality": "internal",
        "source.nat.ip": "0.0.0.0",
        "destination.nat.ip": "0.0.0.0",
        "source.nat.port": "0",
        "destination.nat.port": "0",
        "observer.ip": "1.1.1.1.",
        "source.ip": "2.2.2.2",
        "destination.ip": "3.3.3.3.",
        "source.port": "443",
        "destination.port": "60",
        "tags": "30053",
        "response_count": "0",
        "source.bytes": "110",
        "destination.bytes": "0",
        "source.packets": "1",
        "destination.packets": "0",
        "netflow.exporter.ip": "null",
        "netflow.exporter.port": "0",
        "request_count": "0",
        "event.start": "2024-12-03T20:04:06.000Z",
        "event.end": "2024-12-03T20:04:06.000Z"
      },
      "destination_bytes": 110,
      "source_ip": "2.2.2.2",
      "destination_packets": 1
    },