Logstash improve time performance

hi all,
i have csv file that contains 4 Million lines, each line is more than 100 columns,
i process about 30 columns and add another 10 columns, so that in total i upload about 40 columns to the kibana, in average i need 14 minutes to finish the upload,
in order to improve the time performance i tried to manipulate

  1. pipeline.workers
  2. pipeline.batch.size/batch.delay:
  3. JVM Xmx and Xms
    but the upload time only increased

any ideas.
thanks in advance

Where are you sending the data? Are you sure Logstash is limiting performance and not the destination system? If so, how did you establish this?

logstash/elasticsearch/kibana are installed on the same linux machine
the network speed is around 20Mbps

What does CPU usage, disk I/O and iowait look like while indexing?

image
image

@magnusbaeck
@paz

any idea?
thanks

What does your Logstash pipeline look like?

input {
file {
path => "/var/log/file"
start_position => "beginning"
}
}
filter {
csv {
separator => " "
columns => ["NODE_ID", "MSISDN", "TRANSACTION_URL", "RESPONSE_CODE", "START_TIME", "END_TIME", "RESPONSE_ORIG_SIZE",
"DOWNLOAD_DATA_SIZE","SRC_IP", "ORIG_DST_IP", "USED_DST_IP", "SGSN_IP", "ACC_SESSION_ID", "IMEI2", "USER_AGENT",
"COMPRESSION_LVL", "CONTENT_TYPE", "UPSTREAM_SIZE","DOWNSTREAM_SIZE", "OPTIMIZATION_FLAG", "CA_FLAG", "CC_FLAG",
"CC_BLOCKED", "CC_REASON", "CC_CATEGORY", "INITIAL_BEARER", "NETWORK_BEARER","APN_IP1", "POC_HIT_FLAG", "IMSI",
"ORIGINAL_REQUEST_HOST", "ORG_CONTENT_LENGTH", "CONTENT_LENGTH", "MD5_STRING", "ORG_BITRATE", "BITRATE",
"IS_BUFFER_LIMITING", "IS_FROM_CACHE", "IS_SEEK_REQUEST", "AUDIO_CODEC", "VIDEO_CODEC", "MEDIA_CONTAINER",
"CACHED_EXTRA_BYTES_LENGTH", "PARENT_MSISDN","SEND_NOTIFY", "IS_CACHED", "IS_TO_CACHE", "TRANSCODING_LEVEL",
"HTTP_METHOD", "NETWORK_RATE", "ORG_AUDIO_CODEC", "ORG_VIDEO_CODEC","CONTENT_ENCODING_HEADER", "TRANSFER_ENCODING_HEADER",
"MODIFIED_CONTENT_TYPE", "FULL_RESOURCE_CONTENT_LENGTH", "RANGE_REQUEST_HEADER", "CACHE_HITS","TOOLBAR_FLAG",
"HPI_CACHE_PUT_TIME", "HITS_TM_GET", "TM_IN_TIME", "TM_OUT_TIME", "MEDIA_HANDLING_TYPE", "MEDIA_QUALITY_LEVEL",
"VIDEO_START_TIME","NUMBER_OF_STALLS", "AVERAGE_STALLS_LENGTH", "NUMBER_OF_DRA_DECREASES", "HPI_INSTANCE", "UIDH",
"WEB_RESPONSE_CODE", "WEB_ROUND_TRIP_TIME","CLIENT_ROUND_TRIP_TIME", "HTTP_REQUEST_VERSION", "HTTP_RESPONSE_VERSION",
"DNS_RESPONSE_TIME", "DOWNLOAD_ORIG_SIZE", "IS_FROM_TUNNEL","TOOLBAR_REQUEST_TYPE", "REQUEST_MODIFICATION_INDICATION",
"ORIGINAL_TARGET_URL", "PROFILE_TEMPLATE", "ADDITIONAL_INFO", "CLIENT_CONNECTION_TTL","ICL_REASON", "MLT_LEVEL_REASON",
"OT_LEVEL_REASON", "DRA_LEVEL_REASON", "ONLINE_DRA_LEVEL_REASON", "CELL_ID", "RESOLUTION_HEIGHT", "RESOLUTION_WIDTH",
"PACING_HANDLER", "PACING_FACTOR", "CONGESTION_LEVEL", "CONNECTION_BW", "CONNECTION_RTT", "IS_NAT", "OPTIMIZATION_STATUS",
"IS_CELL_CONGESTED","SESSION_DURATION_REMAINDER", "COMPLETED_DOWNLOAD", "ORIG_DST_PORT", "ORIG_SRC_PORT", "FIN_DURATION",
"ABM_INFO", "CIRCLE_MAPPING", "BACKHAUL_RTT"] #BACKHAUL_RTT is the 109 column
}
}
filter{
mutate{
convert => {
"DOWNSTREAM_SIZE" => "integer"
"RESPONSE_ORIG_SIZE" => "integer"
"UPSTREAM_SIZE" => "integer"
"SESSION_DURATION_REMAINDER" => "integer"
}
}
if ![DOWNSTREAM_SIZE] or ![RESPONSE_ORIG_SIZE] or ![UPSTREAM_SIZE] or ![SESSION_DURATION_REMAINDER]{
drop { }
}else if [DOWNSTREAM_SIZE] < 0 or [RESPONSE_ORIG_SIZE] < 0 or [UPSTREAM_SIZE]<0{
drop { }
}else if [RESPONSE_ORIG_SIZE] < [DOWNSTREAM_SIZE] {
ruby {
code => "event.set('RESPONSE_ORIG_SIZE', event.get('DOWNSTREAM_SIZE'))"
}
}
}
filter{
ruby {
code => "
wanted_fields = ['NODE_ID', 'MSISDN', 'TRANSACTION_URL', 'RESPONSE_CODE', 'START_TIME', 'END_TIME', 'RESPONSE_ORIG_SIZE',
'SRC_IP','USER_AGENT','CONTENT_TYPE','UPSTREAM_SIZE','DOWNSTREAM_SIZE','CC_CATEGORY','NETWORK_BEARER',
'POC_HIT_FLAG','BITRATE','IS_FROM_CACHE','VIDEO_CODEC','MEDIA_CONTAINER','MEDIA_HANDLING_TYPE',
'VIDEO_START_TIME','NUMBER_OF_STALLS','AVERAGE_STALLS_LENGTH','CLIENT_ROUND_TRIP_TIME','PROFILE_TEMPLATE',
'ADDITIONAL_INFO','CELL_ID','RESOLUTION_HEIGHT','RESOLUTION_WIDTH','CONGESTION_LEVEL','CONNECTION_RTT',
'SESSION_DURATION_REMAINDER','ABM_INFO','BACKHAUL_RTT','@timestamp','tags']
event.to_hash.keys.each { |k|
event.remove(k) unless wanted_fields.include? k
}
"
}
}
filter{
mutate{
convert => {
"POC_HIT_FLAG" => "integer"
"IS_FROM_CACHE" => "integer"
"VIDEO_START_TIME" => "integer"
"BITRATE" => "integer"
"CLIENT_ROUND_TRIP_TIME" => "integer"
"CONNECTION_RTT" => "integer"
}
}
if ![POC_HIT_FLAG] or ![IS_FROM_CACHE]{
ruby {
code => "event.set('IS_FROM_CACHE',0)"
}
}
if [IS_FROM_CACHE] > 0 and ([POC_HIT_FLAG] == 2 or [POC_HIT_FLAG] == 3) {
ruby {
code => "event.set('IS_FROM_CACHE',1)"
}
}else{
ruby {
code => "event.set('IS_FROM_CACHE',0)"
}
}
if ![VIDEO_START_TIME]{
ruby {
code => "event.set('VIDEO_START_TIME',0)"
}
}
if [VIDEO_START_TIME]<0 or [VIDEO_START_TIME] > 10000 {
ruby {
code => "event.set('VIDEO_START_TIME',0)"
}
}
if ![BITRATE]{
ruby {
code => "event.set('BITRATE',0)"
}
}
if [BITRATE]<0 {
ruby {
code => "event.set('BITRATE',0)"
}
}
if !([SESSION_DURATION_REMAINDER]){
ruby {
code => "event.set('SESSION_DURATION_REMAINDER',0)"
}
}
if ![CLIENT_ROUND_TRIP_TIME] or ![CONNECTION_RTT]{
ruby {
code => "event.set('TTFB',0)"
}
}else if [CLIENT_ROUND_TRIP_TIME]<=0 or [CONNECTION_RTT]<=0 or [CLIENT_ROUND_TRIP_TIME] >= 5000 or [CONNECTION_RTT] >= 5000 {
ruby {
code => "event.set('TTFB',0)"
}
}else{
ruby {
code => "(event.get('CLIENT_ROUND_TRIP_TIME') + event.get('CONNECTION_RTT')) < 5000 ? event.set('TTFB', event.get('CLIENT_ROUND_TRIP_TIME') + event.get('CONNECTION_RTT')) : 0"
}
}
mutate {
remove_field => ["POC_HIT_FLAG","CLIENT_ROUND_TRIP_TIME","CONNECTION_RTT"]
}
}
filter {
grok {
match => {
"BACKHAUL_RTT" => [
"%{NUMBER:BACKHAUL_RTT_MIN:float}%{NOTSPACE}%{NUMBER:BACKHAUL_RTT_AVG:float}%{NOTSPACE}%{NUMBER:BACKHAUL_RTT_MAX:float}"
]
}
}
mutate{
convert => {
"BACKHAUL_RTT_MIN" => "integer"
"BACKHAUL_RTT_AVG" => "integer"
"BACKHAUL_RTT_MAX" => "integer"
}
}
if ![BACKHAUL_RTT_MIN] or ![BACKHAUL_RTT_AVG] or ![BACKHAUL_RTT_MAX]{
ruby {
code => "event.set('BACKHAUL_RTT_MIN',-1 )
event.set('BACKHAUL_RTT_AVG',-1 )
event.set('BACKHAUL_RTT_MAX',-1 )"
}
}
if [BACKHAUL_RTT_MIN] < 0 or [BACKHAUL_RTT_AVG] < 0 or [BACKHAUL_RTT_MAX] < 0{
ruby {
code => "event.set('BACKHAUL_RTT_MIN',-1 )
event.set('BACKHAUL_RTT_AVG',-1 )
event.set('BACKHAUL_RTT_MAX',-1 )"
}
}
mutate {
remove_field => ["BACKHAUL_RTT"]
}
}
filter {
if !([START_TIME]) {
ruby {
code => "event.set('START_TIME','0')
event.set('START_TIME_MILL',0)"
}
}else{
grok {
match => {
"START_TIME" => [
"%{YEAR:YEAR:int}%{MONTHNUM2:MONTHNUM2:int}%{MONTHDAY:MONTHDAY:int}%{MINUTE:HOUR1:int}%{MINUTE:MINUTE1:int}%{SECOND:SECOND:int}"
]
}
}
if !([HOUR1]) or !([MINUTE1]) or !([SECOND]) {
ruby {
code => "event.set('START_TIME_MILL', 0 )
event.set('START_TIME', '0' )"
}
}else{
ruby {
code => "event.set('START_TIME_MILL',event.get('HOUR1')*3600000+event.get('MINUTE1')*60000+event.get('SECOND')*1000)"
}
}
mutate {
remove_field => ["YEAR","MONTHNUM2","HOUR1","MINUTE1","SECOND"]
}
}
}
output {
if "_rubyexception" in [tags] {
file {
path => "var/log/logstash/errors.log"
}
}
elasticsearch
{
hosts => ["localhost:9200"]
}
}

i could not upload the whole file, its about 22000 characters with 29 filters
here you can see only 6 filters

That is a lot of ruby scripts, some of which look unnecessary. I am not sure how performance of the Ruby filter compares to other filters, but it might be worth replacing some of the ruby filters and see if that makes a difference.

ruby {
  code => "event.set('START_TIME','0')
    event.set('START_TIME_MILL',0)"
}

should as far as I can see be equivalent to:

mutate {
  add_field => {
      "START_TIME" => 0
      "START_TIME_MILL" => 0
    }
}

Given the size of the config there may be other things that you may be able to clean up as well in order to improve performance.

1 Like

What's taking the most time in your pipeline? Hot threads or pipeline stats API should shed some light on that.

As Christian said, there's a lot of repetition in mutate/ruby filters that could probably be nested better.
Also, try using Dissect instead of the CSV filter, you should see some performance improvement

1 Like

thank you paz and @Christian_Dahlqvist for your reply
i changes my code to read Dissect as you mentioned , this change reduced the time from 14 minutes to 10 minutes,

thanks

after manipulating the mutate/ruby filters, i can till that there is no difference in performance

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.