Pasrse data between repeated xml tags

I have a xml file that has repeated tags and i need to extract the fields between those tags , in below mentioned xml data I want to get the data between the job_list tags:

<?xml version='1.0'?>

<job_info xmlns:xsd="https://schemas/qstat.xsd">
<queue_info>
<job_list state="running">
<JB_job_number>148635539</JB_job_number>
<JAT_prio>0.51090</JAT_prio>
<JAT_ntix>1.00000</JAT_ntix>
<JB_nppri>0.45090</JB_nppri>
<hard_request name="mem_avail" resource_contribution="0.000000">400G</hard_request>
<hard_req_queue>largemem.q</hard_req_queue>
<job_binding>NONE</job_binding>
</job_list>
<job_list state="running">
<JB_job_number>148711350</JB_job_number>
<JAT_prio>0.51090</JAT_prio>
<JAT_ntix>1.00000</JAT_ntix>
<JB_nppri>0.45090</JB_nppri>
<hard_request name="mem_avail" resource_contribution="0.000000">900G</hard_request>
<hard_request name="h_rt" resource_contribution="0.000000">518400</hard_request>
<hard_req_queue>largemem.q</hard_req_queue>
<job_binding>NONE</job_binding>
</job_list>
</queue_info>
<job_info>
<job_list state="pending">
<JB_job_number>133460680</JB_job_number>
<JAT_prio>0.00000</JAT_prio>
<JAT_ntix>0.00000</JAT_ntix>
<JB_nppri>0.00000</JB_nppri>
<hard_request name="mem_avail" resource_contribution="0.000000">1000M</hard_request>
<hard_req_queue>all.q</hard_req_queue>
<job_binding>NONE</job_binding>
</job_list>
<job_list state="pending">
<JB_job_number>140091542</JB_job_number>
<JAT_prio>0.00000</JAT_prio>
<JAT_ntix>0.00000</JAT_ntix>
<JB_nppri>0.00000</JB_nppri>
<hard_request name="arch" resource_contribution="0.000000">lx*</hard_request>
<hard_req_queue>all.q</hard_req_queue>
<job_binding>NONE</job_binding>
</job_list>
</job_info>
</job_info>

I tried with below conf but did not seem to work :
input
{
file
{
path => "/usr/share/logstash/qstat.cache"
start_position => "beginning"
sincedb_path => "/dev/null"
type => "xml"
codec => multiline {
pattern => "^</job_list>"
negate => "true"
what => "previous"
}
}
}
filter
{
xml
{
source => "message"
store_xml => false
#target => "job_list"
xpath => [
"/job_list/@state", "Job_State",
"/job_list/JB_job_number/text()", "Job_Number",
"/job_list/JAT_prio/text()", "priority",
"/job_list/JAT_ntix/text()", "ntix",
"/job_list/JB_nppri/text()", "nppri",
"/job_list/hard_req_queue/text()", "hard_req_queue",
"/job_list/binding/text()", "binding"
]
}

}

output
{
stdout
{
codec => rubydebug
}
}

This will not result in valid XML. I suggest you consume the entire file as a single event using a multiline codec with a pattern that never matches. For example

codec => multiline { pattern => "^Spalanzani" what => "previous" negate => true auto_flush_interval => 2 }

You will then be able to parse elements from /job_info/job_list or /queue_info/job_list.

Thanks for the suggestion will try out and come back , however I did try something similar but was getting "multiline_codec_max_lines_reached" error, my original xml has 20631298 lines and it can be even more at some time.
Is there a way to unlimit the number of lines as I cannot predict how many lines would be there

what is the purpose of "auto_flush_interval" here in the codec, can you please explain on this.

After doing suggested change in codec and xpath as below :
xpath => [
"/job_info/queue_info/job_list/@state", "Job_State",
"/job_info/queue_info/job_list/JB_job_number/text()", "Job_Number",
"/job_info/queue_info/job_list/JAT_prio/text()", "priority",
"/job_info/queue_info/job_list/JAT_ntix/text()", "ntix",
"/job_info/queue_info/job_list/JB_nppri/text()", "nppri",
"/job_info/queue_info/job_list/hard_req_queue/text()", "hard_req_queue",
"/job_info/queue_info/job_list/job_binding/text()", "binding",
"/job_info/job_info/job_list/@state", "Job_State",
"/job_info/job_info/job_list/JB_job_number/text()", "Job_Number",
"/job_info/job_info/job_list/JAT_prio/text()", "priority",
"/job_info/job_info/job_list/JAT_ntix/text()", "ntix",
"/job_info/job_info/job_list/JB_nppri/text()", "nppri",
"/job_info/job_info/job_list/hard_req_queue/text()", "hard_req_queue",
"/job_info/job_info/job_list/job_binding/text()", "binding"
]

When I keep auto_flush_interval => 2 then I get data for my <job_list state = running > tags as below
{
"host" => "myelkhost",
"@version" => "1",
"Job_State" => ,
"ntix" => [
[0] "0.00000",
[1] "0.00000"
],
"path" => "/usr/share/logstash/qstat.cache",
"priority" => [
[0] "0.00000",
[1] "0.00000"
],
"hard_req_queue" => [
[0] "all.q",
[1] "all.q"
],
"Job_Number" => [
[0] "133460680",
[1] "140091542"
],
"binding" => ,
"type" => "xml",
"nppri" => [
[0] "0.00000",
[1] "0.00000"
],
"@timestamp" => 2019-03-14T15:34:17.860Z,
"tags" => [
[0] "multiline"
],
"message" => "<?xml version='1.0'?>\n<job_info xmlns:xsd="https://schemas/qstat.xsd">\n <queue_info>\n <job_list state="running">\n <JB_job_number>148635539</JB_job_number>\n <JAT_prio>0.51090</JAT_prio>\n <JAT_ntix>1.00000</JAT_ntix>\n <JB_nppri>0.45090</JB_nppri>\n <hard_request name="mem_avail" resource_contribution="0.000000">400G</hard_request>\n <hard_req_queue>largemem.q</hard_req_queue>\n <job_binding>NONE</job_binding>\n </job_list>\n <job_list state="running">\n <JB_job_number>148711350</JB_job_number>\n <JAT_prio>0.51090</JAT_prio>\n <JAT_ntix>1.00000</JAT_ntix>\n <JB_nppri>0.45090</JB_nppri>\n <hard_request name="mem_avail" resource_contribution="0.000000">900G</hard_request>\n <hard_request name="h_rt" resource_contribution="0.000000">518400</hard_request>\n <hard_req_queue>largemem.q</hard_req_queue>\n <job_binding>NONE</job_binding>\n </job_list>\n </queue_info>\n <job_info>\n <job_list state="pending">\n <JB_job_number>133460680</JB_job_number>\n <JAT_prio>0.00000</JAT_prio>\n <JAT_ntix>0.00000</JAT_ntix>\n <JB_nppri>0.00000</JB_nppri>\n <hard_request name="mem_avail" resource_contribution="0.000000">1000M</hard_request>\n <hard_req_queue>all.q</hard_req_queue>\n <job_binding>NONE</job_binding>\n </job_list>\n <job_list state="pending">\n <JB_job_number>140091542</JB_job_number>\n <JAT_prio>0.00000</JAT_prio>\n <JAT_ntix>0.00000</JAT_ntix>\n <JB_nppri>0.00000</JB_nppri>\n <hard_request name="arch" resource_contribution="0.000000">lx*</hard_request>\n <hard_req_queue>all.q</hard_req_queue>\n <job_binding>NONE</job_binding>\n </job_list>\n </job_info>\n</job_info>"

and when i keep auto_flush_interval => i get data for my <job_list state = pending > tags as below
{
"nppri" => [
[0] "0.00000",
[1] "0.00000"
],
"tags" => [
[0] "multiline"
],
"type" => "xml",
"message" => "<?xml version='1.0'?>\n<job_info xmlns:xsd="https://schemas/qstat.xsd">\n <queue_info>\n <job_list state="running">\n <JB_job_number>148635539</JB_job_number>\n <JAT_prio>0.51090</JAT_prio>\n <JAT_ntix>1.00000</JAT_ntix>\n <JB_nppri>0.45090</JB_nppri>\n <hard_request name="mem_avail" resource_contribution="0.000000">400G</hard_request>\n <hard_req_queue>largemem.q</hard_req_queue>\n <job_binding>NONE</job_binding>\n </job_list>\n <job_list state="running">\n <JB_job_number>148711350</JB_job_number>\n <JAT_prio>0.51090</JAT_prio>\n <JAT_ntix>1.00000</JAT_ntix>\n <JB_nppri>0.45090</JB_nppri>\n <hard_request name="mem_avail" resource_contribution="0.000000">900G</hard_request>\n <hard_request name="h_rt" resource_contribution="0.000000">518400</hard_request>\n <hard_req_queue>largemem.q</hard_req_queue>\n <job_binding>NONE</job_binding>\n </job_list>\n </queue_info>\n <job_info>\n <job_list state="pending">\n <JB_job_number>133460680</JB_job_number>\n <JAT_prio>0.00000</JAT_prio>\n <JAT_ntix>0.00000</JAT_ntix>\n <JB_nppri>0.00000</JB_nppri>\n <hard_request name="mem_avail" resource_contribution="0.000000">1000M</hard_request>\n <hard_req_queue>all.q</hard_req_queue>\n <job_binding>NONE</job_binding>\n </job_list>\n <job_list state="pending">\n <JB_job_number>140091542</JB_job_number>\n <JAT_prio>0.00000</JAT_prio>\n <JAT_ntix>0.00000</JAT_ntix>\n <JB_nppri>0.00000</JB_nppri>\n <hard_request name="arch" resource_contribution="0.000000">lx*</hard_request>\n <hard_req_queue>all.q</hard_req_queue>\n <job_binding>NONE</job_binding>\n </job_list>\n </job_info>\n</job_info>",
"@timestamp" => 2019-03-14T15:43:54.868Z,
"host" => "myelkhost",
"@version" => "1",
"Job_State" => [
[0] "pending",
[1] "pending"
],
"binding" => [
[0] "NONE",
[1] "NONE"
],
"path" => "/usr/share/logstash/qstat.cache",
"hard_req_queue" => [
[0] "all.q",
[1] "all.q"
],
"priority" => [
[0] "0.00000",
[1] "0.00000"
],
"ntix" => [
[0] "0.00000",
[1] "0.00000"
],
"Job_Number" => [
[0] "133460680",
[1] "140091542"
]
}

But I do not get data for all the 4 running and pending status tags at same time.

I am running below command to test my pipeline

/usr/share/logstash/bin/logstash -f /usr/share/logstash/xml-pipeline.conf_ask --config.reload.automatic --path.settings /etc/logstash --path.data /usr/share/logstash/qcache/data

auto_flush_interval tells the codec that if it does not get a match to the regexp for 2 seconds then to flush whatever it has read onto the pipeline as an event. When using a pattern that never matches this is the only way to create an event.

If you have a 20 million line file than I suggest ingesting each job_info element separately, as you originally tried to do. Use a codec with what => "next"

codec => multiline { pattern => "^</job_list>" what => "next" negate => true }

Then modify the message to remove anything you do not need. I would have expected to be able to do this with mutate+gsub, but I cannot get it to work, so I used ruby

    ruby {
        code => '
            m = event.get("message").scan(/<job_list.*/m)
            if m == []
                event.cancel
            else
                event.set("message", m)
            end
        '
    }
    xml { source => "message" target=> "theXML" store_xml => true }

It can be done with mutate+gsub using

 mutate { gsub => [ "message", "(?m).*(<job_list)", "\1" ] }

Thanks for the answer, mutate filter seems working fine and is extracting the tags between the job_list, but when I test the pipline I only get result for last job_list tag rest all I do not see, is it overriding, with the sample xml file that I pasted earlier I ran below logstash config :

input
{
file
{
path => "/usr/share/logstash/qstat.cache"
start_position => "beginning"
sincedb_path => "/dev/null"
type => "xml"
codec => multiline { pattern => "^</job_list>" what => "next" negate => true }
}
}
filter
{
mutate { gsub => [ "message", "(?m).*(<job_list)", "\1" ] }

xml
{
source => "message"
store_xml => false
xpath => [
"/job_list/@state", "Job_State",
"/job_list/JB_job_number/text()", "Job_Number",
"/job_list/JAT_prio/text()", "priority",
"/job_list/JAT_ntix/text()", "ntix",
"/job_list/JB_nppri/text()", "nppri",
"/job_list/hard_req_queue/text()", "hard_req_queue",
"/job_list/job_binding/text()", "binding"
]
}
}
output
{ stdout { codec => rubydebug }}

I expected all the four job_list tags info to be printed at my stdout but I got the output for only the last tag after doing "control+C" , below is the pipeline run :

/usr/share/logstash/bin/logstash -f /usr/share/logstash/qstat-pipeline.conf --config.reload.automatic --path.settings /etc/logstash --path.data /usr/share/logstash/qcache/data

Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties
[2019-03-17T08:04:05,585][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-03-17T08:04:05,606][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.6.1"}
[2019-03-17T08:04:13,992][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2019-03-17T08:04:15,323][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x38daa0a4 run>"}
[2019-03-17T08:04:15,525][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[2019-03-17T08:04:15,590][INFO ][filewatch.observingtail ] START, creating Discoverer, Watch with file and sincedb collections
[2019-03-17T08:04:16,260][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
^C[2019-03-17T08:04:49,054][WARN ][logstash.runner ] SIGINT received. Shutting down.
[2019-03-17T08:04:49,370][INFO ][filewatch.observingtail ] QUIT - closing all files and shutting down.
{
"nppri" => [
[0] "0.00000"
],
"hard_req_queue" => [
[0] "all.q"
],
"binding" => [
[0] "NONE"
],
"ntix" => [
[0] "0.00000"
],
"path" => "/usr/share/logstash/qstat.cache",
"message" => "<job_list state="pending">\n <JB_job_number>140091542</JB_job_number>\n <JAT_prio>0.00000</JAT_prio>\n <JAT_ntix>0.00000</JAT_ntix>\n <JB_nppri>0.00000</JB_nppri>\n <hard_request name="arch" resource_contribution="0.000000">lx*</hard_request>\n <hard_req_queue>all.q</hard_req_queue>\n <job_binding>NONE</job_binding>\n </job_list>\n </job_info>\n</job_info>",
"priority" => [
[0] "0.00000"
],
"host" => "inn-elk-vm",
"Job_Number" => [
[0] "140091542"
],
"@timestamp" => 2019-03-17T02:34:49.797Z,
"Job_State" => [
[0] "pending"
],
"@version" => "1",
"type" => "xml",
"tags" => [
[0] "multiline"
]
}
[2019-03-17T08:04:51,264][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x38daa0a4 run>"}

also on writing to elasticsearch also only 1 tag data is written :

{

"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 1.0,
"hits" : [
{
"_index" : "noida_qstat-2019.03.17.22",
"_type" : "doc",
"_id" : "ext1iWkBTL48HKwyT_vG",
"_score" : 1.0,
"_source" : {
"binding" : [
"NONE"
],
"Job_State" : [
"pending"
],
"@version" : "1",
"type" : "xml",
"tags" : [
"multiline",
"_xmlparsefailure"
],
"host" : "inn-elk-vm",
"path" : "/usr/share/logstash/qstat.cache",
"Job_Number" : [
"140091542"
],
"priority" : [
"0.00000"
],
"ntix" : [
"0.00000"
],
"nppri" : [
"0.00000"
],
"message" : "<job_list state="pending">\n <JB_job_number>140091542</JB_job_number>\n <JAT_prio>0.00000</JAT_prio>\n <JAT_ntix>0.00000</JAT_ntix>\n <JB_nppri>0.00000</JB_nppri>\n <hard_request name="arch" resource_contribution="0.000000">lx*</hard_request>\n <hard_req_queue>all.q</hard_req_queue>\n <job_binding>NONE</job_binding>\n </job_list>\n </job_info>\n</job_info>",
"@timestamp" : "2019-03-17T02:22:38.974Z",
"hard_req_queue" : [
"all.q"
]
}
}
]
}
}

Hi , I was able to parse each line as a separate event by using below config, what I did was that I removed all the other tags and only kept <job_list> tags , as my concerned data was inside those tags itself.

input
{
file
{
path => "/usr/share/logstash/qstat.cache"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => multiline { pattern => "</job_list>" what => "next" negate => true auto_flush_interval => 10 }
}
}
filter
{
mutate { gsub => [ "message", "<job_info.>", "" ] }
mutate { gsub => [ "message", "<.xml version.>", "" ] }
mutate { gsub => [ "message", "</job_info.
>", "" ] }
mutate { gsub => [ "message", "<queue_info.>", "" ] }
mutate { gsub => [ "message", "</queue_info.
>", "" ] }
xml
{
source => "message"
remove_namespaces => true
store_xml => false
#target => "theXML"
xpath => [
"/job_list/@state", "Job_State",
"/job_list/JB_job_number/text()", "Job_Number",
"/job_list/JAT_prio/text()", "priority",
"/job_list/JAT_ntix/text()", "ntix",
"/job_list/JB_nppri/text()", "nppri",
#"/job_list/hard_request[@name]", "hard_request",
"/job_list/hard_req_queue/text()", "hard_req_queue",
"/job_list/job_binding/text()", "binding"
]
}
mutate {
rename => [
"[Job_State][0]", "Job_State",
"[Job_Number][0]", "Job_Number",
"[priority][0]", "priority",
"[ntix][0]", "ntix",
"[nppri][0]", "nppri",
"[hard_req_queue][0]", "hard_req_queue",
"[binding][0]", "binding"
]
}
}
output
{
stdout { codec => rubydebug }
}

Thanks for you assistance all the way on this !

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.