Hello, everyone!
I am attempting to parse a large XML file that looks like this:
<?xml version="1.0" encoding="UTF-8"?>
<organisaties>
<organisatie systeemId="01">
<naam>ORG_01</naam>
</organisatie>
<organisatie systeemId="02">
<naam>ORG_02</naam>
<organisaties>
<organisatie systeemId="02_01">
<naam>ORG_02_01</naam>
</organisatie>
<organisatie systeemId="02_02">
<naam>ORG_02_02</naam>
<organisaties>
<organisatie systeemId="02_02_01">
<naam>ORG_02_02_01</naam>
</organisatie>
<organisatie systeemId="02_02_02">
<naam>ORG_02_02_02</naam>
<organisaties>
<organisatie systeemId="02_02_02_01">
<naam>ORG_02_02_02_01</naam>
</organisatie>
<organisatie systeemId="02_02_02_02">
<naam>ORG_02_02_02_02</naam>
</organisatie>
</organisaties>
</organisatie>
</organisaties>
</organisatie>
</organisaties>
</organisatie>
</organisaties>
The actual file contains approximately 1,000,000 rows and features up to six levels of depth for "organisaties."
I want to extract every single organization from the file while retaining information about its parent for sub-organizations.
For example, I expect the following split:
{
"systeemId": "01",
"naam": "ORG_01",
"parentSysteemId": null,
"parentNaam": null,
"depth_level": 0
}
{
"systeemId": "02",
"naam": "ORG_02",
"parentSysteemId": "01",
"parentNaam": "ORG_01",
"depth_level": 1
}
{
"systeemId": "02_01",
"naam": "ORG_02_01",
"parentSysteemId": "02",
"parentNaam": "ORG_02",
"depth_level": 2
}
{
"systeemId": "02_02",
"naam": "ORG_02_02",
"parentSysteemId": "02",
"parentNaam": "ORG_02",
"depth_level": 2
}
{
"systeemId": "02_02_01",
"naam": "ORG_02_02_01",
"parentSysteemId": "02_02",
"parentNaam": "ORG_02_02",
"depth_level": 3
}
{
"systeemId": "02_02_02",
"naam": "ORG_02_02_02",
"parentSysteemId": "02_02",
"parentNaam": "ORG_02_02",
"depth_level": 3
}
{
"systeemId": "02_02_02_01",
"naam": "ORG_02_02_02_01",
"parentSysteemId": "02_02_02",
"parentNaam": "ORG_02_02_02",
"depth_level": 4
}
{
"systeemId": "02_02_02_02",
"naam": "ORG_02_02_02_02",
"parentSysteemId": "02_02_02",
"parentNaam": "ORG_02_02_02",
"depth_level": 4
}
I have created three pipelines that operate as follows:
Pipeline_1 downloads the initial file, performs an initial split, and forwards the event to Pipeline_2 if the organization contains its own sub-organizations. It adds the parent systemeid and parent naam to the event. If there are no sub-organizations, the event is sent to Pipeline_3, which renames some fields and applies additional mutations.
Pipeline_2 processes each sub-organization by splitting it further. If a sub-organization has its own sub-organizations, it forwards the event back to Pipeline_2 for additional processing. If not, it passes the event to Pipeline_3.
Pipeline_3 focuses on formatting the event and making it more presentable.
While this solution works well for small datasets, I encounter issues with the production file due to insufficient RAM, occasionally causing the service to be silently terminated. I have already increased the JVM size to 12GB. The Logstash version I am using is 8.15.0, and it runs in a Docker container with four cores allocated.
I also noticed that after the initial split, organizations without sub-organizations are not immediately forwarded to Pipeline_3. Instead, it seems that all events are collected before they are sent to Pipeline_3. For example, when I specify stdout => rubydebug, I receive all the events at once, rather than seeing them delivered one by one. Is it expected?
So, if someone has any ideas how I can optimize my solution and shed some light on the question above, you are welcome
Information of the pipelines are below:
[pipeline_1.conf]
input {
http_poller {
urls => {
url => "https://my_file.com"
}
request_timeout => 6000
schedule => { every => "10000m" }
codec => "plain"
metadata_target => "http_poller_metadata"
}
}
filter {
mutate {
remove_field => ["http_poller_metadata", "event", "@version", "@timestamp", "host", "name"]
}
xml {
source => "message"
force_array => false
target => "parsed_xml"
remove_namespaces => true
remove_field => ["@version", "@timestamp"]
}
mutate {
remove_field => ["message"]
}
# By default, if [parsed_xml][organisaties] exists but contains only one [organisatie] element, it will be converted automatically to hash
# However, in this case we would not be able to make a split
# thus, we need to convert it to array manually
ruby {
code => '
if event.get("[parsed_xml][organisaties][organisatie]").is_a?(Hash)
event.set("[parsed_xml][organisaties][organisatie]", [event.get("[parsed_xml][organisaties][organisatie]")])
end
'
}
# Split the main organizations
split {
field => "[parsed_xml][organisaties][organisatie]"
target => "organisatie"
remove_field => ["parsed_xml", "@version", "@timestamp"]
}
mutate {
add_field => {"[organisatie][org_depth]" => 1}
}
mutate {
convert => {"[organisatie][org_depth]" => "integer"}
}
# If organisation contains suborganisations, we clone this document and mark with the special tag
# The marked document will be sent to the pipeline_2, while the original document will be sent to pipeline_3
if [organisatie][organisaties] {
clone {
clones => ["suborganisations_are_present"]
}
if "suborganisations_are_present" not in [tags] {
mutate {
remove_field => ["[organisatie][organisaties]", "@version", "@timestamp"]
}
} else {
mutate {
rename => { "[organisatie]" => "event" }
}
}
}
}
# Docs that do not contain suborganisations will be sent to the pipeline_3
# while the rest are sent to the pipeline_2 to continue processing suborganisations in a recursive way
output {
if "suborganisations_are_present" in [tags] {
pipeline {
send_to => "pipeline_2"
}
} else {
pipeline {
send_to => "pipeline_3"
}
}
}
[pipeline_2.conf]
input {
pipeline {
address => "pipeline_2"
}
}
filter {
mutate {
remove_field => ["tags", "@version", "@timestamp"]
}
# By default, if [parsed_xml][organisaties] exists but contains only one [organisatie] element, it will be converted automatically to hash
# However, in this case we would not be able to make a split
# thus, we need to convert it to array manually
ruby {
code => '
if event.get("[event][organisaties][organisatie]").is_a?(Hash)
event.set("[event][organisaties][organisatie]", [event.get("[event][organisaties][organisatie]")])
end
'
}
split {
field => "[event][organisaties][organisatie]"
target => "organisatie"
add_field => {
"[organisatie][parent_naam]" => "%{[event][naam]}"
"[organisatie][parent_systeemId]" => "%{[event][p:systeemId]}"
"[organisatie][org_depth]" => "%{[event][org_depth]}"
}
remove_field => ["event", "@version", "@timestamp"]
}
ruby {
code => '
event.set("[organisatie][org_depth]", event.get("[organisatie][org_depth]").to_i + 1)
'
}
# If suborganisation contains its own subsuborganisations, we clone this document and mark with the special tag
# The marked document will be sent to the pipeline_2 again, while the original document will be sent to pipeline_3
if [organisatie][organisaties] {
clone {
clones => ["suborganisations_are_present"]
}
if "suborganisations_are_present" not in [tags] {
mutate {
remove_field => ["[organisatie][organisaties]", "@version", "@timestamp"]
}
} else {
mutate {
rename => { "[organisatie]" => "event" }
}
}
}
}
output {
if "suborganisations_are_present" in [tags] {
pipeline {
send_to => "pipeline_2"
}
} else {
pipeline {
send_to => "pipeline_3"
}
}
}
[pipeline_3]
input {
pipeline {
address => "pipeline_3"
}
}
filter {
mutate {
<...>
}
if [organisatie][parent_naam] {
mutate {
rename => { "[organisatie][parent_naam]" => "parent_naam" }
}
}
if [organisatie][parent_systeemId] {
mutate {
rename => { "[organisatie][parent_systeemId]" => "parent_systeemId" }
}
}
mutate {
remove_field => ["@version", "@timestamp"]
}
}
output {
<...> # stdout or sending to ES
}