Multiline vs aggregate

I have log file with this structure:

ID,Tag,Value
10179265,37,21
10179265,2001001,15020737
10179265,2003001,0
10179265,2005000,ARM_OPERATOR
10179265,2005002,Int_OPERATOR
10160893,37,21
10160893,2001001,15054905
10160893,2001003,VOICE
10160893,2003000,0
10160893,2003001,0
10179296,2004306,20171005130229
10179296,2005000,
10179296,2005001,Roam_OPERATOR
10179296,2005002,Int_OPERATOR
10179281,37,21
10179281,2001001,15038876

the file daily have about 40mil rows
I gather the files from production by filebeat

I need to merge all lines with same ID into 1 record,
all IDs are stored always together in multiple lines

the question is
?How to create MULTILINE configuration PATTERN to match lines with same ID?

would it be better solution to use agregate filter in logstash?

with aggreagate filter I concern of assumption to set workers to 1 .

thank you

when I wrote this question I found pattern 37,21 I could use to identify start of multiline so this works now

multiline.pattern: 37,21$
multiline.negate: true
multiline.match: after

I receive

"message" => "111222,37,21\n111222,2001001,15037925\n111222,2001003,VOICE\n111222,2003000,0"

now the issue is logstash part

filter {
csv {
columns => ["ID","Tag","Value"]
}}

but how to handle \n new lines?

Looking at your sample 27,21 doesn't look like a good pattern:

e.g.;

10160893,37,21
...
10179296,2004306,20171005130229
10179296,2005000,
10179296,2005001,Roam_OPERATOR
10179296,2005002,Int_OPERATOR

This doesn't look like a smimple multiline problem. Plus, you want to see all tags/values for an ID.

At first I'd parse each line into an event of type {"id": ..., "tag": ..., "value": ...}.
TBH I think that's good enough. By searching for an ID I can immediately see each tag/value. By searching for a tag I can see each ID having said tag. This is great for building visualisations as is.

If you still want to combine events, such that tag and value become arrays you will have to use the aggregate filter after dissect/grok (e.g. combine until ID changes). An event will become {"id": ..., "tags": [ ... ], "values": [ ... ]}.

1 Like

thank you @stefffe
great answer

My concern is related to storage consumption by metadata.
Daily file has 30Mil rows
1 id is represented by 5-20 rows.
No agregation means
Each row contains lots of metadata like
source, version, timestamp, beat.hostname, beat.name, ID, tags, type, host
and payload is two numbers {37=21}

Do you think this is good approach to keep it as is and not aggregate it ?
The reason for aggregation was to save storage make it faster.

There are fields which are not important and I need to index only some of them. But I do not want to remove all fields, I may need then in future.

My idea was :
from this

10179296,2004306,20171005130229
10179296,2005000,
10179296,2005001,Roam_OPERATOR
10179296,2005002,Int_OPERATOR

to get something like:

message="10179296|2004306=20171005130229|2005000=Test|2005001=Roam_OPERATOR|2005002=Int_OPERATOR"

and parse only selected fields.
for instance

kv {
allow_duplicate_values => false
field_split => "|"
include_keys => ["2001003","2004305","2005000","2005001","2005002","2001003","2004305","2005000","2005001","2005002"]
}

this will give me (different row, but just for reference that there is lot of fields which would normally cost several lines :

"ID" => "543567454321",
"2004305" => "A",
"2005000" => "POST24",
"2005001" => "ET",
"2005002" => "IntPOST24",
"2001003" => "DATA",
"message" => "37=21|2001001=83367495|2001003=DATA|2003000=0|2003001=1|2003002=0|2003003=2|2003004=4|2003005=4|2003006=5|2003007=5|2003008=4|2003009=3|2003010=3|2003011=3|2003012=3|2003013=2|2003014=2|2003015=1|2003017=0|2004305=A|2004306=20190223164230|2005000=POST24|2005001=ET|2005002=IntPOST24|2006505=1812|2006506=75000000|2006507=600000000",
"offset" => 60557684,
"@timestamp" => 2019-01-07T15:38:10.516Z,

Thank you for any advice

Regarding storage, you should do some testing if it really is an issue or some form of premature optimization. I guess the less cardinality a field has, the better it will compress with doc values, which is kind of a way to store documents in columnar fashion.

One can also disable _source, so to reduce disk usage as well.

I don't know much about your data or how the data will be consumed which makes it hard for me to make a recommendation. But the way you describe it kinda makes sense as well. I guess it also depends on how you want to query the data. Given the pattern you described I'd use dissect + ruby filter with a post-aggregate filter one.

e.g. 10179296,2004306,20171005130229
dissect: {"id": 10179296, "key": 2004306, "value": 20171005130229}
ruby: {"id": 10179296, "2004306": 20171005130229}
aggregate: {"id": 10179296, "2004306": 20171005130229, "2005000": "Test", ...}

While filebeat has dissect support, the 'key'-transformation + aggregate need to be done in Logstash.

1 Like

Thank you, I plan to test logstash aggregate, but meanwhile I did comparison of single vs multiline option with filebeat.
Single ID has 20 rows in average. It means inserting 20rows vs 1 row (using aggregation multiline in filebeat).

the result was:

option rows Size Index Time
Single 3,8M 1,42GB index_time: "6.8m",
Multi 74,3M 11,9GB index_time": "1.8h",

SINGLE Line Option

$ cat 230DEV_SINGLE.conf
input {
beats {
port => 5000
}
}

filter {
csv {
columns => ["ID","Key","Value"]
skip_header => true
}
}

output {
elasticsearch { ...

MULTILINE Option

$ cat 230DEV_MULTI.conf
input {
beats {
port => 5000
}
}

filter {
grok {
match => {"message" => "(?(120|121)[0-9]{9})"}
}
#modify message to remove ID, and create Tag=Value (KV type)
mutate {
gsub => [
"message", "(120|121)[0-9]{9}", "",
"message", "\n,", "|",
"message", "^,", "",
"message", ",", "="
]
}
#parse message as KV index only interesting keys
kv {
allow_duplicate_values => false
field_split => "|"
include_keys => ["2001003","2004305","2005000","2005001","2005002"]
}

}

output {
elasticsearch { ...

I plan to test aggregation in Logstash as you recommended to see the results.
Thank you

Hi Steffens, please can you help me how to write ruby code
from suggested dissect: {"id": 10179296, "key": 2004306, "value": 20171005130229}
to transfrom value of key to key and value to value
ruby: {"id": 10179296, "2004306": 20171005130229}

thank you

See ruby filter docs and Event API docs.

One could define the ruby filter like this (without testing):

filter {
  ...
  ruby {
    code => "event.set(event.get('key'), event.get('value'))"
    remove_field => ["key", "value"]
  }
 
  ...
}

The remove_field setting removes the original key and value fields from the event, if the code itself did not fail.

1 Like

Excellent thank you.
It was first time I used ruby code. Ruby API reference doc , was the piece I couldnt find. Now it is clear.

Finally I incorporated the ruby part into aggregation. Thanks @steffens for help.
Index time compared to single line insert vs multiline (filebeat) is

* {logstash aggregation}	aggreg_test  	rows:3804206	size:1.7gb	indextime: 1,8h 	inserttime: 3h 20min
* {all lines no aggreg }	single_test 	rows:74326466	size:23.8gb	indextime: 12,4m	inserttime:	2h 45min
* {multiline filebeat}	multi_test   	rows:3802915	size:2.8gb	indextime: 6,8m		inserttime: 14min

aggregation provides the most (all is indexed) and least storage however it makes logstash busy the longest. more than 3 hours compared to multiline option 14minut to insert the same file into elasticsearch.

Is the aggregation I did correct? Do you have any help why is it so slow?

Aggregation:
filter {
dissect {
mapping => {"message" => "%{ID},%{key},%{value}"}
}

aggregate {
  task_id => "%{ID}"
  code => "
     map['ID'] = event.get('ID')
     map['profile'] ||= []
     map['profile'] << {event.get('key') => event.get('value')}    
     event.cancel()  
  "
  push_previous_map_as_event => true 
  timeout => 0
  #remove_field => ["key", "value"]
}  

}

this aggregation provides the result:

 @timestamp	       	February 6th 2019, 15:42:01.095
t  @version	       	1
t  ID	       	3201235446654
t  _id	      	ea5Cw2gBEQ455fveKMOJ
t  _index	      	aggreg_test
#  _score	    	 - 
t  _type	      	doc
?  profile	     	  {
  "37": "21"
},
{
  "2001001": "83793503"
},
{
  "2003000": "0"
},
{
  "2003001": "1"
},
{
  "2003002": "0"
},
{
  "2003003": "2"
},
{
  "2003004": "4"
},
{
  "2003005": "4"
},
{
  "2003006": "5"
},
{
  "2003007": "5"
},
{
  "2003008": "4"
},
{
  "2003009": "3"
},
{
  "2003010": "3"
},
{
  "2003011": "3"
},
{
  "2003012": "3"
},
{
  "2003013": "2"
},
{
  "2003014": "2"
},
{
  "2003015": "1"
},
{
  "2003017": "0"
},
{
  "2004305": "A"
},
{
  "2004306": "20190212154008"
},
{
  "2005000": "Na_Feature"
},
{
  "2005001": "ET"
},
{
  "2005002": "IntNa_Feature"
},
{
  "2006505": "1812"
},
{
  "2006506": "72000000"
},
{
  "2006507": "600000000"
},
{
  "2020151": "0"
},
{
  "2020342": "-2147483647"
}

I'd say the aggregate filter looks fine. Unfortunately I don't have that much experience with this filter to reason about CPU/memory overhead.

The aggregated events are somewhat more complex as well. This means: more overhead in serializing, deserializing, indexing. Maybe you should also measure the throughput without ES. For example:

output {
  stdout {
    codec => dots
  }
}

This will print a dot per event (no encoding). Running LS with bin/logstash | pv -War >/dev/null will print the event throughput.

How long does it take to process the files this way?

@steffens
Thank you so much for all help.
the last issue I have is filebeat -> logstash & singleworker configuration

My idea is to create 2 pipelines in Logstash; 1st for all existing logs and 2nd for logs require aggregation (single worker)

logstash]$ less pipelines.yml

  • pipeline.id: main
    path.config: "/etc/logstash/conf.d/*.conf"

  • pipeline.id: singleworker
    path.config: "/etc/logstash/singleworker.d/*.conf"
    pipeline.workers: 1

but I need to configure filebeats acordingly

filebeat.prospectors:

  • input_type: log
    paths:

    • /intPROD_PI/.json
      tags: ["prod"]
      close_eof: true
      clean_removed: true
      enabled: true
      close_
      : true
      harvester_limit: 5
  • input_type: log
    paths:

    • /intPROD_OSD/.json
      tags: ["prod"]
      close_eof: true
      clean_removed: true
      enabled: true
      close_
      : true
      harvester_limit: 5

output.logstash:
hosts: ["10.0.0.38:5000", "10.0.0.39:5000", "10.0.0.40:5000"]
loadbalance: true

" HERE I NEED TO ADD IF LOG NEEDS AGGREGATION SEND IT TO DIFFERENT PIPELINE

  • input_type: log
    paths:
    • /forAGGREGATION/.csv
      tags: ["singleworker"]
      close_eof: true
      clean_removed: true
      enabled: true
      close_
      : true
      harvester_limit: 5

output.logstash:
hosts: ["10.0.0.38:5001", "10.0.0.39:5001", "10.0.0.40:5001"]
loadbalance: true

Please what is best practice to configure filebeat -> logstash in my case
filebeat collects n + m logs. m logs needs aggregation.
Logstash recieves x +y logs from multiple filebeats and y logs needs aggregation.

is my Idea above with 2 pipelines and 1 filebeat correct?
how to write if statement in filebeat?

we have latest logstash 6.6.0
but filebeat is 5.4 (compiled on Spark/Solaris)

thank you

You will have to do the event routing in Logstash using the pipeline output. See: https://www.elastic.co/guide/en/logstash/current/pipeline-to-pipeline.html

Assuming each pipeline has it's own ES (output) like:

                         +----------------------------+
                         | +-----------+   +--------+ |
              +----------> |LS (single)|   |ES (out)| +---------+
              |          | +-----------+   +--------+ |         |
+--+     +----+--+       +----------------------------+         v
|FB+---->+LS (in)|                                            +-++
+--+     +----+--+                                            |ES|
              |          +----------------------------+       +-++
              |          | +----------+    +--------+ |         ^
              +--------->+ |LS (multi)|    |ES (out)| +---------+
                         | +----------+    +--------+ |
                         +----------------------------+

This approach has 2 potential disadvantages. These very well depend on actual logs and performance of the pipelines:

  • The ES output in Logstash uses the bulk API, so to reduce overhead when publishing events. If batches are very small you loose potential throughput.
  • The 2 pipelines are indirectly coupled. Meaning: if one is slow and creates back-pressure, then the back-pressure will slow down the LS(in) phase and Filebeat itself. This results in the second pipeline being slowed down as well

-> There will be potentially no performance improvement just by creating 2 separate pipelines this way.

When introducing 2 pipelines you want to decouple them, such that they can operate independently. This requires you to duplicate some work or have a potential very very large buffer both pipelines can draw events from independently (still with filtering). This can be solved by either introducing kafka or running 2 filebeat instances (which is somewhat simpler). E.g. by running 2 filebeat instances you will have 2 file readers, each operating at it's own pace. The actual pace will be dictated by ES and the back-pressure generated by the Logstash pipeline itself:

+-----------+         +-----------+
|FB (single)+-------->+LS (single)+---------+
+-----------+         +-----------+         |
                                           +v-+
                                           |ES|
                                           ++-+
+----------+          +----------+          ^
|FB (multi)+--------->+LS (multi)+----------+
+----------+          +----------+

Drawbacks:

  • requires separate ports for each pipeline
  • requires 2 filebeat instances
  • requires additional filtering in Logstash or Filebeat to send/process only single/multi-line events.

Yet you will have completely separate pipelines, allowing the single-line events to outpace the multiline ones.

Again, I'd recommend to test/benchmark the 2 architectures to see which one better fits your needs:

  • Is there really a measurable performance difference
  • You really want single/multiline events to become visible at a different pace?

Not sure if this would suite your data/use-case, but one could also have the single-line event pipeline insert place-holder events (use 'create' action in Elasticsearch output, so to not overwrite existing events) and have the multiline-pipeline overwrite the place-holder events later (use index action in Elasticsearch output).

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.