Help with conf file to process log with binary data

Hi I have log files that contains some binary data. For now I just wanted to try to ingest them using stdin and then later build a grok filter. However, it is not ingesting the first log file, no error, it just hangs.

My conf file:

input {
file {
path => "/opt/sample-data/xx-Logs-v1/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

output {
elasticsearch {
hosts => "http://10.0.2.15:9200"
index => "xx-logs-v1.0"
}
stdout {}
}

==================================================

Sample Log File Content:

Mon 11/13/2017 15:06:21.54 xx_ver_xxx 1710

xx_ver_xxx 1710

processing C:\opt\xxBuild\1710\xx_xx_matched.dat vs C:\opt\xxBuild\1710\xx_xx_matched.dat.
C:\opt\xxBuild\1710\xx_xx_matched.dat - Opened
C:\opt\xxBuild\1710\xx_xx_matched.dat - Opened
...not yet at EOF for C:\opt\xxBuild\1709\xx_xx_matched.dat... 0 recs left over.

Done (0 00:00:13).
198830419 recs read from C:\opt\xxBuild\1710\xx_xx_matched.dat.
198595230 recs read from C:\opt\xxBuild\1709\xx_xx_matched.dat.
198611163 recs matched Z11
124531056 of these were both MF == 1.


0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
1 0 313746 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0

@sconrod, if you want to debug a logstash input or filter, then do not send the data to elasticsearch. Set your output to

output { stdout { codec => rubydebug } }

Review the output from logstash (do not try to review the input to elasticsearch using Kibana). Once you have the right data in the right fields, with the right types, in the logstash output, then you can start putting it into ES. (When I say types, as an example, if you need to do mutate/convert to make something an integer, make sure it looks like

"someField" => 42,

in the rubydebug output and not

"someField" => "42",

which would indicate it is a string. It looks like a trivial difference, but it is not, since it avoids mapping conflicts as you do incremental debugging.)

The file input expects newline delimited files. If you literally have binary data in your logs, that probably is not going to work well. If you have multi-line hex dumps of binary data in your logs it might work, or perhaps a multiline input codec would be better. Without seeing more detail on the input, and especially the rubydebug output, it is hard to tell.

Hi Badger, I did try that but it never processes..it just hangs forever and no error. I do not have this happen with any other log type, just the log that contains binary data.

Hi Badger, I got the log ingested from standard in.

Now I want to just grok out several fields of the data and not see the rest.

Here is the lines of data out of the log I am interested in:

Mon 11/13/2017 15:06:21.54 s1_ver_cmp 1710

s1_ver_cmp 1710

198830419 recs read from D:\data\s1Build\1710\data_s1_matched.dat.
198595230 recs read from D:\data\s1Build\1709\data_s1_matched.dat.
198611163 recs matched Z11
124531056 of these were both MF == 1.

Percent of data exact matches that changed s1 code: 0.000 %
Percent of data exact matches that stayed the same s1 code: 100.000 %


Here is my mapping:
PUT e1-logs-v4
{
"mappings": {
"doc": {
"properties": {
"Name":{
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"Number": { "type": "integer","ignore_malformed": true},
"FileDate": { "type": "date" },
"Path": { "type": "text" },
"filename": {"type": "keyword"}
}
}
}
}


I would like to grok out as follows to the mappings:

Mon 11/13/2017 15:06:21.54
grok to date in YYYYMMDD format

s1_ver_cmp 1710
grok to filename

198830419 recs read from D:\data\s1Build\1710\data_s1_matched.dat.
grok to new field that is aggregatable: recs_read

198595230 recs read from D:\data\s1Build\1709\data_s1_matched.dat.
grok the number to the number field
grok the recs read from to the name field

198611163 recs matched Z11
grok to the field: number and recs matched z11 to name
124531056 of these were both MF == 1.

Percent of data exact matches that changed s1 code: 0.000 %
grok the percentage to the number field in a percentage format Or create a new field "percent"

Percent of data exact matches that stayed the same s1 code: 100.000 %
same as last one

@sconrod Firstly, I just checked in for the first time in a week, and had that not been within a hour or so of you posting I would never have seen your reply. If you start the post with an @ mention then I would get a notification of it and would have been notified of your reply even if I had been away for another week.

By default a stdin filter creates a separate event for each line of input. Thus "Mon 11/13/2017 15:06:21.54" and "198611163 recs matched Z11" are separate events, which would after processing would be indexed as separate documents. I think it is more likely you want the entire multi-line log entry treated as a single event. For that you need to find a regexp that tells the logstash input when to start a new event. Does every new event, and only new events have a date at the start of a line?

Assuming they do, the first thing is to configure a multiline codec on your stdin input. That will look something like this

input {
  stdin {
    codec => multiline {
      pattern => "^(Mon|Tue|Wed|Thu|Fri|Sat|Sun) "
      negate => true
      what => previous
    }
  }
}

output { stdout { codec => rubydebug } }
That says to start a new event every time logstash sees a line that does NOT match the pattern. Note that this means that if you just feed one event to logstash it cannot consume it, because it has not seen the start of the second event, which is what tells it the first event is all there.

Once you have that working your events will have a message field that looks like this

 "message" => "Mon 11/13/2017 15:06:21.54 s1_ver_cmp 1710\n\ns1_ver_cmp 1710\n\n198830419 recs read from D:\\data\\s1Build\\1710\\data_s1_matched.dat.\n198595230 recs read from D:\\data\\s1Build\\1709\\data_s1_matched.dat.\n198611163 recs matched Z11\n124531056 of these were both MF == 1.\n\nPercent of data exact matches that changed s1 code: 0.000 %\nPercent of data exact matches that stayed the same s1 code: 100.000 %",

You could try to build a single grok filter that matches that, but it will be extremely fragile as you edit it and you will drive yourself nuts doing it. So build grok filters that match each line. There is no magic quoting syntax for a newline inside a grok pattern, just split the pattern across lines. So for example, we call pull out the percent of match that stayed the same using:

grok {
    match => { "message" => [ "
Percent of data exact matches that stayed the same s1 code: %{NUMBER:percentSame:float} %" ] }
}

The two lines that show the number of records read would both match the same pattern, so we need a pattern that combines them. If you remove the third %{INT} you will find out why I added it :slight_smile:

  grok {
    match => { "message" => [ "
%{INT:firstFileRecords:int} recs read from %{PATH:firstFile}\.
%{INT:secondFileRecords:int} recs read from %{PATH:secondFile}\.
%{INT}" ] }
  }

The date is the only one that we can anchor using ^

grok {
  match => { "message" => [ "^%{DAY} %{DATE:date} %{TIME:time}\.%{INT:subsecond}" ] }
  add_field => { "stamp" => "%{date} %{time}.%{subsecond}" }
}

Then you can use a date filter to parse the stamp field into the timestamp, then a mutate/remove_field to get rid of the intermediates (once you know they are working properly).

@Badger
Hi Thank you I have tried this and the first step worked in regards to to setting the multi-line codec, but when I added in the remaining grok filters I am getting a parse error even tho I verified the YAML is valid with a YAML linter. I believe it is an issue with my syntax I would greatly appreciate you taking a look:

My conf file:
input {
stdin {
codec => multiline {
pattern => "^(Mon|Tue|Wed|Thu|Fri|Sat|Sun) "
negate => true
what => previous
}
}
file {
path => "/opt/sample-data/E1-logs-v1/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

filter {

grok {
match => ["path", "/opt/sample-data/E1-logs-v1/%{DATA:filename:keyword}.log"] }

grok {
match => [ "message","Percent of data exact matches that stayed the same s1 code:", %{NUMBER:percentSame:float} %" ] }

grok {
match => [ "message","%{INT:firstFileRecords:int} recs read from %{PATH:firstFile}.%{INT:secondFileRecords:int} recs read from %{PATH:secondFile}.%{INT}" ] }

grok {
match => [ "message", "^%{DAY} %{DATE:date} %{TIME:time}.%{INT:subsecond}" ]
add_field => ["stamp","%{date} %{time}.%{subsecond}" ]
}
}

output {
elasticsearch {
hosts => "http://10.0.2.15:9200"
index => "e1-logs-v7"
}

stdout {codec => rubydebug }
}


[ERROR] 2017-12-19 13:43:07.685 [Ruby-0-Thread-1: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:22] agent - Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, ", ', -, [, { at line 23, column 90 (byte 446) after filter {\n\ngrok {\n match => ["path", "/opt/sample-data/E1-logs-v1/%{DATA:filename:keyword}.log"] }\n\n\n\ngrok {\n match => [ "message", "Percent of data exact matches that stayed the same s1 code:", ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:42:in compile_ast'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:50:incompile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:54:in compile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:12:inblock in compile_sources'", "org/jruby/RubyArray.java:2486:in map'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:incompile_sources'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:107:in compile_lir'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:49:ininitialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:215:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:35:inexecute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:335:in block in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:inwith_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:332:in block in converge_state'", "org/jruby/RubyArray.java:1734:ineach'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:319:in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:166:inblock in converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:164:inconverge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:90:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:362:inblock in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}

@sconrod Change

match => [ "message","Percent of data exact matches that stayed the same s1 code:", %{NUMBER:percentSame:float} %" ] }

to

match => [ "message","Percent of data exact matches that stayed the same s1 code: %{NUMBER:percentSame:float} %" ] }

When posting, if you wrap your confings in <pre> and </pre> they are easier for people to read.

1 Like

@Badger
I got these two grok filters working looks like but the rest are not yet.....
thank you for these two working ones I am plugging away on the others and will update if I cannot get them going in another day. Thanks for all your help. Learning more..

grok {
match => ["path", "/opt/sample-data/E1-logs-v1/%{DATA:filename:keyword}.log"] }

grok {
match => { "message" => [ "Percent of MAF exact matches that stayed the same E1 code: %{NUMBER:percentSame:float} %" ] }
}


HOWEVER.....WHAT IS super wierd IS WHEN i add in the third one, it breaks all of them.....below are all three together....

grok {
match => ["path", "/opt/sample-data/E1-logs-v1/%{DATA:filename:keyword}.log"] }

grok {
match => { "message" => [ "Percent of MAF exact matches that stayed the same E1 code: %{NUMBER:percentSame:float} %" ] }
}

grok {
match => { "message" => [ "Percent of MAF exact matches that changed E1 code: %{NUMBER:percentChanged:float} %" ] }
}


here is the errror:

{
"path" => "/opt/sample-data/E1-logs-v1/e1_ver_cmp_1710_edit.log",
"@timestamp" => 2017-12-21T00:56:37.831Z,
"filename" => "e1_ver_cmp_1710_edit",
"@version" => "1",
"host" => "ubuntu-16",
"message" => "Percent of MAF exact matches that changed E1 code: 0.000 %",
"tags" => [
[0] "_grokparsefailure"
]
}
{
"path" => "/opt/sample-data/E1-logs-v1/e1_ver_cmp_1710_edit.log",
"percentSame" => 100.0,
"@timestamp" => 2017-12-21T00:56:37.848Z,
"filename" => "e1_ver_cmp_1710_edit",
"@version" => "1",
"host" => "ubuntu-16",
"message" => "Percent of MAF exact matches that stayed the same E1 code: 100.000 %",
"tags" => [
[0] "_grokparsefailure"
]
}

You have the multiline codec for stdin, but not for the file input. So every line is being parsed independently for the files, and they will always fail to match one of the groks if you have more than one. Lines from stdin will not have a path field, so they will get a grok parse failure for that one.

@Badger

Hi I have that I should have pasted the entire config file...here is the entire config file....

 input {
stdin {
codec => multiline {
pattern => "^(Mon|Tue|Wed|Thu|Fri|Sat|Sun) "
negate => true
what => previous
}
}
  file {
    path => "/opt/sample-data/E1-logs-v1/*.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {

grok {
match => ["path", "/opt/sample-data/E1-logs-v1/%{DATA:filename:keyword}.log"] }

grok {
match => [ "message","Percent of MAF exact matches that stayed the same E1 code: %{NUMBER:percentSame:float} %" ] }

grok {
match => [ "message","Percent of MAF exact matches that changed E1 code: %{NUMBER:percentChanged:float} %" ] }

grok {
    match => { "message" => [ "%{INT:firstFileRecords:int} recs read from %{PATH:firstFile}\.%{INT:secondFileRecords:int} recs read from %{PATH:secondFile}\.%{INT}" ] }
  }

grok {
match => { "message" => [ "^%{DAY} %{DATE:date} %{TIME:time}\.%{INT:subsecond}" ] }
add_field => { "stamp" => "%{date} %{time}.%{subsecond}" }
}

}

output {
stdout {codec => rubydebug }
}

@sconrod OK, so let's get rid of the stdin input, and just use the file input. That needs a multiline codec added (the one on the stdin input does not impact the file input).

input {
  file {
    codec => multiline {
      pattern => "^(Mon|Tue|Wed|Thu|Fri|Sat|Sun) "
      negate => true
      what => previous
    }
    path => "/tmp/X*.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
  grok { match => { "path" => "/(?[^/]+).log" } }
  grok { match => { "message" => "^%{DAY} %{DATE:date} %{TIME:time}\.%{INT:subsecond}" } }
  grok { match => { "message" => "Percent of %{WORD} exact matches that stayed the same %{WORD} code: %{NUMBER:percentSame:float} %" } }
  grok { match => { "message" => "Percent of %{WORD} exact matches that changed %{WORD} code: %{NUMBER:percentChanged:float} %" } }
  grok { match => { "message" => "%{INT:firstFileRecords:int} recs read from %{PATH:firstFile}\.
%{INT:secondFileRecords:int} recs read from %{PATH:secondFile}\.
%{INT}" } }
  mutate { add_field => { "stamp" => "%{date} %{time}.%{subsecond}" } }
  date {
    match => [ "stamp", "MM/dd/yyyy HH:mm:ss.SS" ]
    timezone => "Asia/Baku"  
  }
  mutate { remove_field => [ "date",  "time", "subsecond", "stamp" ] }
}

It appears that once a grok fails, subsequent groks do not do anything, so keep the working ones at the top of the filter, and tune the one at the bottom. Yeah, I know you are probably not in Baku, it's a placeholder :slight_smile:

@Badger
Would it be better to use filebeats for this?

You are going to end up doing the parsing in logstash anyway, so how you get the files off disk does not make much difference.

@Badger
Still not working. I would like to start from scratch and simplify this.

Say I only want to grok these two lines an nothing else...

Percent of MAF exact matches that changed E1 code: 0.000 %
Percent of MAF exact matches that stayed the same E1 code: 100.000 %

So that I have two fields added:

These will be keyword
E1-MAF-MATCH
E1-MAF-CHANGED
These will be keyword

and the 100.00 will be an Integer

Here is my try:

grok{
match => ["message" => "%{DATA:log-message} "changed E1 code:" %{NUMBER:Number:INT}"]
add_field => ["MAF-E1-CHANGED:keyword", "%{log-message}"]
}

grok{
match => ["message" => "%{DATA:log-message} "stayed the same E1 code:" %{NUMBER:Number:INT}"]
add_field => ["MAF-E1-SAME:keyword", "%{log-message}"]
}

@sconrod

grok{
  match => ["message" => "%{DATA:log-message} "changed E1 code:" %{NUMBER:Number:INT}"]
  add_field => ["MAF-E1-CHANGED:keyword", "%{log-message}"]
}
  1. You can do either 'match => [ "fieldName", "pattern" ]' or 'match => { "fieldName" => "pattern"}'. Either will work, but you are mixing and matching, which does not work.

  2. The pattern should be a single quoted string. Remove the quotes inside the pattern:

"%{DATA:log-message} "changed E1 code:" %{NUMBER:Number:INT}"

should be

"%{DATA:log-message} changed E1 code: %{NUMBER:Number:INT}"

  1. Appending :keyword to the field probably does not do what you want it to do.

  2. '%{NUMBER:Number:INT}' should be '%{NUMBER:Number:int}' (or even %{NUMBER:Number:float}')

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.