Logstash: XML recursive split

Hello, everyone!

I am attempting to parse a large XML file that looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<organisaties>
	<organisatie systeemId="01">
		<naam>ORG_01</naam>
	</organisatie>
	<organisatie systeemId="02">
		<naam>ORG_02</naam>
		<organisaties>
			<organisatie systeemId="02_01">
				<naam>ORG_02_01</naam>
			</organisatie>
			<organisatie systeemId="02_02">
				<naam>ORG_02_02</naam>
				<organisaties>
					<organisatie systeemId="02_02_01">
						<naam>ORG_02_02_01</naam>
					</organisatie>
					<organisatie systeemId="02_02_02">
						<naam>ORG_02_02_02</naam>
						<organisaties>
							<organisatie systeemId="02_02_02_01">
								<naam>ORG_02_02_02_01</naam>
							</organisatie>
							<organisatie systeemId="02_02_02_02">
								<naam>ORG_02_02_02_02</naam>
							</organisatie>
						</organisaties>
					</organisatie>
				</organisaties>
			</organisatie>
		</organisaties>
	</organisatie>
</organisaties>

The actual file contains approximately 1,000,000 rows and features up to six levels of depth for "organisaties."

I want to extract every single organization from the file while retaining information about its parent for sub-organizations.

For example, I expect the following split:

{
	"systeemId": "01",
	"naam": "ORG_01",
	"parentSysteemId": null,
	"parentNaam": null,
	"depth_level": 0
}
{
	"systeemId": "02",
	"naam": "ORG_02",
	"parentSysteemId": "01",
	"parentNaam": "ORG_01",
	"depth_level": 1
}
{
	"systeemId": "02_01",
	"naam": "ORG_02_01",
	"parentSysteemId": "02",
	"parentNaam": "ORG_02",
	"depth_level": 2
}
{
	"systeemId": "02_02",
	"naam": "ORG_02_02",
	"parentSysteemId": "02",
	"parentNaam": "ORG_02",
	"depth_level": 2
}
{
	"systeemId": "02_02_01",
	"naam": "ORG_02_02_01",
	"parentSysteemId": "02_02",
	"parentNaam": "ORG_02_02",
	"depth_level": 3
}
{
	"systeemId": "02_02_02",
	"naam": "ORG_02_02_02",
	"parentSysteemId": "02_02",
	"parentNaam": "ORG_02_02",
	"depth_level": 3
}
{
	"systeemId": "02_02_02_01",
	"naam": "ORG_02_02_02_01",
	"parentSysteemId": "02_02_02",
	"parentNaam": "ORG_02_02_02",
	"depth_level": 4
}
{
	"systeemId": "02_02_02_02",
	"naam": "ORG_02_02_02_02",
	"parentSysteemId": "02_02_02",
	"parentNaam": "ORG_02_02_02",
	"depth_level": 4
}

I have created three pipelines that operate as follows:

Pipeline_1 downloads the initial file, performs an initial split, and forwards the event to Pipeline_2 if the organization contains its own sub-organizations. It adds the parent systemeid and parent naam to the event. If there are no sub-organizations, the event is sent to Pipeline_3, which renames some fields and applies additional mutations.

Pipeline_2 processes each sub-organization by splitting it further. If a sub-organization has its own sub-organizations, it forwards the event back to Pipeline_2 for additional processing. If not, it passes the event to Pipeline_3.

Pipeline_3 focuses on formatting the event and making it more presentable.

While this solution works well for small datasets, I encounter issues with the production file due to insufficient RAM, occasionally causing the service to be silently terminated. I have already increased the JVM size to 12GB. The Logstash version I am using is 8.15.0, and it runs in a Docker container with four cores allocated.

I also noticed that after the initial split, organizations without sub-organizations are not immediately forwarded to Pipeline_3. Instead, it seems that all events are collected before they are sent to Pipeline_3. For example, when I specify stdout => rubydebug, I receive all the events at once, rather than seeing them delivered one by one. Is it expected?

So, if someone has any ideas how I can optimize my solution and shed some light on the question above, you are welcome

Information of the pipelines are below:

[pipeline_1.conf]

input {
  http_poller {
    urls => {
        url => "https://my_file.com"
    }
    request_timeout => 6000
    schedule => { every => "10000m" }
    codec => "plain"
    metadata_target => "http_poller_metadata"
  }
}


filter {

    mutate {
        remove_field => ["http_poller_metadata", "event", "@version", "@timestamp", "host", "name"]
    }

    xml {
        source => "message"
        force_array => false
        target => "parsed_xml"
        remove_namespaces => true
        remove_field => ["@version", "@timestamp"]
    }

    mutate {
        remove_field => ["message"]
    }

    # By default, if [parsed_xml][organisaties] exists but contains only one [organisatie] element, it will be converted automatically to hash
    # However, in this case we would not be able to make a split
    # thus, we need to convert it to array manually
    ruby {
        code => '
            if event.get("[parsed_xml][organisaties][organisatie]").is_a?(Hash)
                event.set("[parsed_xml][organisaties][organisatie]", [event.get("[parsed_xml][organisaties][organisatie]")])
            end
        '
    }

    # Split the main organizations
    split {
        field => "[parsed_xml][organisaties][organisatie]"
        target => "organisatie"
        remove_field => ["parsed_xml", "@version", "@timestamp"]
    }

    mutate {
        add_field => {"[organisatie][org_depth]" => 1}
    }

    mutate {
        convert => {"[organisatie][org_depth]" => "integer"}
    }

    # If organisation contains suborganisations, we clone this document and mark with the special tag
    # The marked document will be sent to the pipeline_2, while the original document will be sent to pipeline_3
    if [organisatie][organisaties] {
        
        clone {
            clones => ["suborganisations_are_present"]
        }

        if "suborganisations_are_present" not in [tags] {
            mutate {
                remove_field => ["[organisatie][organisaties]", "@version", "@timestamp"]
            }
        } else {
            mutate {
                rename => { "[organisatie]" => "event" }
            }
        }
    }
}

# Docs that do not contain suborganisations will be sent to the pipeline_3
# while the rest are sent to the pipeline_2 to continue processing suborganisations in a recursive way
output {
    if "suborganisations_are_present" in [tags] {
        pipeline {
            send_to => "pipeline_2"
        }
    } else {
        pipeline {
            send_to => "pipeline_3"
        }
    }
}
[pipeline_2.conf]

input {
  pipeline {
    address => "pipeline_2"
  }
}

filter {

    mutate {
        remove_field => ["tags", "@version", "@timestamp"]
    }

    # By default, if [parsed_xml][organisaties] exists but contains only one [organisatie] element, it will be converted automatically to hash
    # However, in this case we would not be able to make a split
    # thus, we need to convert it to array manually
    ruby {
        code => '
            if event.get("[event][organisaties][organisatie]").is_a?(Hash)
                event.set("[event][organisaties][organisatie]", [event.get("[event][organisaties][organisatie]")])
            end
        '
    }

    split {
        field => "[event][organisaties][organisatie]"
        target => "organisatie"
        add_field => {
            "[organisatie][parent_naam]" => "%{[event][naam]}"
            "[organisatie][parent_systeemId]" => "%{[event][p:systeemId]}"
            "[organisatie][org_depth]" => "%{[event][org_depth]}"
        }
        remove_field => ["event", "@version", "@timestamp"]
    }

    ruby {
        code => '
            event.set("[organisatie][org_depth]", event.get("[organisatie][org_depth]").to_i + 1)
        '
    }

    # If suborganisation contains its own subsuborganisations, we clone this document and mark with the special tag
    # The marked document will be sent to the pipeline_2 again, while the original document will be sent to pipeline_3
    if [organisatie][organisaties] {
        
        clone {
            clones => ["suborganisations_are_present"]
        }

        if "suborganisations_are_present" not in [tags] {
            mutate {
                remove_field => ["[organisatie][organisaties]", "@version", "@timestamp"]
            }
        } else {
            mutate {
                rename => { "[organisatie]" => "event" }
            }
        }
    }
}

output {
    if "suborganisations_are_present" in [tags] {
        pipeline {
            send_to => "pipeline_2"
        }
    } else {
        pipeline {
            send_to => "pipeline_3"
        }
    }
}
[pipeline_3]

input {
  pipeline {
    address => "pipeline_3"
  }
}

filter {
  mutate {
    <...>
  }

  if [organisatie][parent_naam] {
      mutate {
          rename => { "[organisatie][parent_naam]" => "parent_naam" }
      }
  }

  if [organisatie][parent_systeemId] {
      mutate {
          rename => { "[organisatie][parent_systeemId]" => "parent_systeemId" }
      }
  }

  mutate {
    remove_field => ["@version", "@timestamp"]
  }

}

output {
    <...> # stdout or sending to ES
}

I believe it is. If you look at the code of the split filter, it removes the array it is going to split from the event[1], then iterates over the array, and for each entry it creates a clone of the source event and adds that entry from the array to it. The filter then calls yield for the cloned event.

yield is a ruby construct that allows you to call a code block with an argument. (The argument being the cloned event in this case.)

I can't be sure, but I think this is the code block being called. That creates an array and keeps adding entries (events) to it until the filter has finished with the source event, at which point the entire array of events will get flushed down the pipeline.

That means you will just get a single burst of events when the split completes.

The only way I can think of to change that behaviour would be to resort to a ruby filter. Copy code from the split filter, but instead of calling yield, send the events to another pipeline.

One way to do that would be to open a TCP connection to another pipeline that has a tcp input, and write each event to it as a line of JSON. You may hit size limits there. I vaguely recall there being a 16 KB line limit for the tcp input, and if that has been increased then I can see 64 KB may cause an issue. I haven't tested any of this.

Another possibility would be to set up a temporary directory and in your ruby filter send each event to a separate file in it, then have the other pipeline monitor that directory using a file input. As long as you don't hit inode re-use it should be OK. Not sure if using a GUID or sequence number for the file name would help, I suspect not.

You might want to ask another question about whether there is another intermediate system that can provide an append-only bounded-queue of arbitrarily large events between a logstash output and a logstash input. I would be very interested to see the results.

file or tcp? Probably not. kafka/MQ/RabbitMQ? maybe. There are at least half a dozen others where I have no idea whether the size limit is small or large.


  1. Cloning the event without deleting the source field and then overwriting the source field would be a memory management disaster, it didn't take long for that bug to be noticed and fixed. ↩︎

Hey @Badger!

Sorry for the late response -_-

Thank you so much for your detailed explanation and recommendations!

I experimented with your suggestions but, in the end, ended up with handling the entire process in Python: I loaded the XML, performed a recursive split, enriched each extracted object, and then sent everything to Elastic via the bulk API

This approach worked perfectly for my case

Thanks again for your input!