Issue with Persistent Queue : not polling high volume records

Hi all

i am using a conf file whole input flush 80K records after read writes to disk

My PQ setting

queue.page_capacity: 64mb
queue.max_bytes: 1gb

input {
  http_poller{
      urls => {
         http_request => {
				method => get
				url => "http://xyzfmt=CSV"
				}
			}

			codec => "json"  
		  }
} 
filter {  
			
			
split {
	field => "message"
	}
    csv {
        columns => ["Org", "Network", ......................................"Brand"]
        separator => ","
    }


mutate {
    remove_field => ["message","tags"]
  }
			 
}


output { 
stdout { codec => rubydebug } 
elasticsearch 
{ 

}
}

After reading the input i get 80 K records and it does split operations and i have to wait 4 hrs

Why i cant i do filter operations on 20 records --> output to 20 records
then in second batch
filter operations on another 20 records --> output another 20 records to output

Is there are settings in persistent queue wherein i can define to post my records in batches of 20/50/100

Currently it does split operation 80 K records which takes around 4 hrs and then start posting records to output, which is very slow process.

For this i have to wait 4 hrs to see records in output, cant we start seeing it just after minutes at least records in batches of 20?

Cant we start seeing 20 records in output in couple of minutes, else i have to wait 4 hrs to check the output

I think this is very common issue, did anyone tried any other ways to solve this?

Do you have multiple pipeline worker threads? With 2 threads, I can ingest an array of 80,000 objects and the second thread starts processing events being split off the by the first thread after about 17 seconds.

Thanks for you reply. This is how my output looks like.
Do you mean to say shall i increase number of worker, that will solve my problem?

output { 
stdout { codec => rubydebug } 
elasticsearch 
{ 
action => "index"
hosts => ""
index => "abc" 
workers => 1
}
}

You would change it in logstash.yml. If you are running on a single CPU machine then the documentation says it would default to a single pipeline worker. To me it appears that that would require the 80,000 events to be persisted before any of them can be processed. Setting

pipeline.workers: 2

may help.

Hi @Badger
After your suggestion, i checked documentation also. it claims filter and output will execute in parallel.
I set my worker first to 2 and then to 4 as well
i tried with yml file settings as well as while running command prompt -w4

In the both case split event is going on and on. is this really work?

I waited for 5-10 min in each, but index never created

Any other suggestions you have

Thanks

That's true, but it was not what I was concerned about. If you only have one worker thread then I would expect that it will be working away in the .each loop in the split, leaving no thread available to process the events that it is emitting.

I would check the hot threads API to see what the workers are doing. Check the logs to see if you there are any messages that suggest problems (e.g. excessive GC).

Hi @Badger
In log file, i dont see any issue, as it is doing split operations well and it goes on and on
I checked with Hot Thread API, i see 2 threads are waiting.
How should i start that?

"hot_threads" : {
"time" : "2019-02-06T02:41:50+05:30",
"busiest_threads" : 3,
"threads" : [ {
"name" : "LogStash::Runner",
"thread_id" : 1,
"percent_of_cpu_time" : 20.34,
"state" : "timed_waiting",
"traces" : [ "java.lang.Object.wait(Native Method)", "java.lang.Thread.join(Thread.java:1253)", "org.jruby.internal.runtime.NativeThread.join(NativeThread.java:76)" ]
}, {
"name" : "Ruby-0-Thread-9@[main]>worker3",
"thread_id" : 28,
"percent_of_cpu_time" : 12.26,
"state" : "runnable",
"path" : "/lib/logstash/pipeline.rb:383",
"traces" : [ "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:139)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:145)", "F_3a_.logstash_minus_6_dot_3_dot_2.logstash_minus_core.lib.logstash.filters.base.invokeOther35:each(/lib/logstash/filters/base.rb:186)" ]
}, {
"name" : "[main]-pipeline-manager",
"thread_id" : 18,
"percent_of_cpu_time" : 0.54,
"state" : "timed_waiting",
"traces" : [ "java.lang.Object.wait(Native Method)", "java.lang.Thread.join(Thread.java:1253)", "org.jruby.internal.runtime.NativeThread.join(NativeThread.java:76)" ]
} ]
}
}

Hi @Badger

I tried with various options like

pipeline.workers: 2
pipeline.output.workers: 2

But still the pipelines does the job of split first then only goes to output, which is a huge 4 hrs to do the filter operations before pushing to output

I have pasted log file in my above message

Do you have any suggestion for m?

No, I cannot think of anything else.

@Christian_Dahlqvist
Hi, do you have any idea on this.
This is a part of documentation but does not work as expected. I think this is a common problem in high volume data so how to ingest data faster to output.
Any suggestions will be very helpful

I have never used http poller to retrieve large volumes of data, but am not surprised it is slow as it likely has a lot of JSON parsing to do in a single thread. Is there no other way to get the data into the system??

Oh okay, I thought http poller is meant to retrieve responses. Even though if I have 5000 records then also it will take lot of time to give the output first it will complete the split operation on top of those 5000 records then only it will start flushing output
I am sure this is common case with every input accept file inputs because file input uses since DB files

moreover I am surprised with the documentation where in its written the filter and output will work together once we define pipeline.worker with value greater than 1 so that it will do parallel operations.
By this I assume one thread will do the split operation and the other thread will go and start flushing the output.

Without looking at the code I suspect the whole payload will need to be parsed before the split is performed, and this likely happens in a single thread. Once events have been generated it is all multi-threaded, but I would not be surprised if this initial processing is what is limiting throughput.

Once I get the response from input, it is as good as json data or json input, which is stored in a "message" field.
After that filter comes into picture, now i feel split filter does not have capability to do multi-threading

Can you help me to understand, what you mean by parsed before split. I am doing whole parsing using split filter
What else i can do before split filter?

Correct. Split is single threaded. (As is the json codec on the input.)

okay, but i feel this must be a common problem across data source types.
When you are taking json data in bulk all goes into one big message.

How other guys in community using it. I feel this is very common issue as in today world every data is in json format

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.