Logstash as a parser for data in column

Hi
I'm wonder about approach for this specific data constructed in column. How I can parser only through logstash. Some of fields a specially "ENUM" "HEADER", "BLOCK", "DATE" etc, are changing dynamically.

One files is divided to columns and rows as it was marked on picture.
Do You have any idea how to parse it?

Capture_EL_2

Is this a CSV file?
Is the file at least always the same format?

You could either define multiple grok patterns (if you know how the fields change) or use a CSV filter and define your delimiter and automatically detect the headers.

csv {
autodetect_column_names => true
autogenerate_column_names => false
}

but can I use a CSV template when these values or columns are not separated by semicolons or commas

As long as there is a consistent delimiter you can define what that delimiter is manually.

Can You share an examples?

an the second thing is: How to move a row from (beging "SERVID..") to the first line to be handled as one row through columns.

Check out the example in the documentation just define a seperator.

If your output is falling on multiple lines then you first need to use a multiline input to get everything into one document then you can apply the csv filter.

This is definitely not an easy task you have here, but with enough massaging it can be done.

Is there anything you can do to make the output from your app more machine readable?

1 Like

@Badger You're good at this stuff, can You help with the concept?
How to rebuild it so I can use autodetect_column_names?
InkedCapture_EL_2_LI_2

the row of data:

ENUM HEADER                          BLOCK   DATE    TIME
1184 LDAP FAILURE IN SEARCH RESULT   SDABS   211214  134850800

SERVID     USERID     REASON
0          2          32

I've tried to use this method but it doesn't match yet
In fact, it is non-uniform and there are several tabs between columns.

input {
 file {
	path => "/opt/data/input/test.txt*"
	start_position => "beginning"
	codec => multiline { "^\d" negate => false what => previous }
	sincedb_path => "/dev/null"
	}
}

filter {

if [message]=~ /^ENUM/ {
	mutate { add_field => { "[@metadata][format]" => "format1" }}
	} else {
		mutate { add_field => { "[@metadata][format]" => "format2" }}
		
}


if [@metadata][format] == "format1" {
    csv { separator => " " 
	autodetect_column_names => true
	autogenerate_column_names => false 
	}
if [ENUM] == "ENUM" { drop {} }
if [EVENT] == "EVENT" { drop {} }
if [MOID] == "MOID" { drop {} }
if [ATTRBID] == "ATTRBID" { drop {} }
	
	
} else {
	csv { separator => " " 
	autodetect_column_names => true
	autogenerate_column_names => false }
    if [SERVID] == "SERVID" { drop {} }
}

output {
stdout {
codec => rubydebug {}
}
}

Ok. I played around with it a bit and I need clarification about your test.txt file.

Does your sample data look like this?

ENUM HEADER                          BLOCK   DATE    TIME
1184 LDAP FAILURE IN SEARCH RESULT   SDABS   211214  134850800

SERVID     USERID     REASON
0          2          32

ENUM HEADER                          BLOCK   DATE    TIME
1185 LDAP FAILURE IN SEARCH RESULT   SDABS   2113414  1343043800

SERVID     USERID     REASON
0          2          32

ENUM HEADER                          BLOCK   DATE    TIME
1194 LDAP FAILURE IN SEARCH RESULT   SDABS   2323214  13483230800

SERVID     USERID     REASON
0          2          32

or does it look like this?

ENUM HEADER                          BLOCK   DATE    TIME
1184 LDAP FAILURE IN SEARCH RESULT   SDABS   211214  134850800

SERVID     USERID     REASON
0          2          32

1185 LDAP FAILURE IN SEARCH RESULT   SDABS   2113414  1343043800

0          2          32

1194 LDAP FAILURE IN SEARCH RESULT   SDABS   2323214  13483230800

0          2          32

Better yet, could you post multiple entries from your sample data?

Because the first one you may be able to massage into something you can use but the second one probably not (especially with column headers).

Is there any way you can get your data output to be better?

This is the data in the form of printouts from the system, unfortunately they can not be presented otherwise, I post the entire file to have a broad perspective of how it looks like. Maybe someone has an idea how to convert it in logstash to a good form.

link for data input https://easyupload.io/booeol

Thanks that helps... unfortunately I had to do something dirty and use a multiline codec to combine the lines, remove all of the \n newline characters using a gsub and then write a grok filter to get the information out.
It's not pretty..but I got results.

INPUT

ENUM HEADER                          BLOCK   DATE    TIME 1184 LDAP FAILURE IN SEARCH RESULT   SDABS   211214  134851800  SERVID     USERID     REASON                            0          2          32  MOID                                                      ATTRBID  

GROK

ENUM%{SPACE}HEADER%{SPACE}BLOCK%{SPACE}DATE%{SPACE}TIME %{NUMBER:ENUM}%{SPACE}%{DATA:HEADER}%{SPACE}%{WORD:BLOCK}%{SPACE}%{NUMBER:DATE}%{SPACE}%{NUMBER:TIME}%{SPACE}SERVID%{SPACE}USERID%{SPACE}REASON%{SPACE}%{NUMBER:SERVID}%{SPACE}%{NUMBER:USERID}%{SPACE}%{NUMBER:REASON}

OUTPUT

{
  "ENUM": [
    [
      "1184"
    ]
  ],
  "HEADER": [
    [
      "LDAP FAILURE IN SEARCH RESULT"
    ]
  ],
  "BLOCK": [
    [
      "SDABS"
    ]
  ],
  "DATE": [
    [
      "211214"
    ]
  ],
  "TIME": [
    [
      "134851800"
    ]
  ],
  "SERVID": [
    [
      "0"
    ]
  ],
  "USERID": [
    [
      "2"
    ]
  ],
  "REASON": [
    [
      "32"
    ]
  ]
}

Until you can get some way to better transform the output you are going to have to manually define your grok patterns.

My logstash output:

{
        "USERID" => "2",
       "message" => "ENUM HEADER                          BLOCK   DATE    TIME 1184 LDAP FAILURE IN SEARCH RESULT   SDABS   211214  134851800  SERVID     USERID     REASON                            0          2          32  MOID                                                      ATTRBID",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "asp123.myserver.com",
        "HEADER" => "LDAP FAILURE IN SEARCH RESULT",
          "TIME" => "134851800",
          "ENUM" => "1184",
          "DATE" => "211214",
        "SERVID" => "0",
    "@timestamp" => 2021-12-22T19:16:38.162Z,
        "REASON" => "32",
         "BLOCK" => "SDABS"
}
[INFO ] 2021-12-22 14:16:40.609 [LogStash::Runner] runner - Logstash shut down.

Here's my logstash config:

input {
  generator {
    message => '
S  P 2112141349 AF-7      

EVENT REPORTING RESULT

ENUM HEADER                          BLOCK   DATE    TIME
1184 LDAP FAILURE IN SEARCH RESULT   SDABS   211214  134851700

SERVID     USERID     REASON                           
0          2          32

MOID                                                   


ATTRBID


ENUM HEADER                          BLOCK   DATE    TIME
1184 LDAP FAILURE IN SEARCH RESULT   SDABS   211214  134851800

SERVID     USERID     REASON                           
0          2          32

MOID                                                   


ATTRBID


ENUM HEADER                          BLOCK   DATE    TIME
1184 LDAP FAILURE IN SEARCH RESULT   SDABS   211214  134851800

SERVID     USERID     REASON                           
0          2          32

MOID                                                   


ATTRBID

'
  count => 1
  codec => multiline {
    pattern => 'ENUM HEADER                          BLOCK   DATE    TIME'
    what => previous
    negate => true
   }

  }

}

filter {
	mutate {
		gsub => ["message","\n"," "]
	}
	
	grok {
		match => {"message" => "ENUM%{SPACE}HEADER%{SPACE}BLOCK%{SPACE}DATE%{SPACE}TIME %{NUMBER:ENUM}%{SPACE}%{DATA:HEADER}%{SPACE}%{WORD:BLOCK}%{SPACE}%{NUMBER:DATE}%{SPACE}%{NUMBER:TIME}%{SPACE}SERVID%{SPACE}USERID%{SPACE}REASON%{SPACE}%{NUMBER:SERVID}%{SPACE}%{NUMBER:USERID}%{SPACE}%{NUMBER:REASON}"}
	}
}

output {

stdout{}
}

Thx a lot then I'll try to attach it to the stream

If it works please mark the idea that worked best for you as a solution :slight_smile:

How it should be like file input if I want to parse several files on time
the root cause was "count => 1"

input {
	file {
    mode => read
    path => "/opt/data/input/BC*"
	file_completed_action => "log"
	file_completed_log_path => "/opt/data/logstash_files/fin_log"
    sincedb_path => "/dev/null"
 
 count => 1
  codec => multiline {
    pattern => 'ENUM HEADER                          BLOCK   DATE    TIME'
    what => previous
    negate => true
   }

  }

}
[ERROR] 2021-12-23 21:04:15.194 [Converge PipelineAction::Create<hlr_40>] file - Unknown setting 'count' for file
[ERROR] 2021-12-23 21:04:15.211 [Converge PipelineAction::Create<hlr_40>] agent - Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:hlr_40, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: (ConfigurationError) Something is wrong with your configuration.", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:119)", "org.logstash.execution.JavaBasePipelineExt.initialize(JavaBasePipelineExt.java:86)", "org.logstash.execution.JavaBasePipelineExt$INVOKER$i$1$0$initialize.call(JavaBasePipelineExt$INVOKER$i$1$0$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:837)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1169)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuperSplatArgs(IRRuntimeHelpers.java:1156)", "org.jruby.ir.targets.InstanceSuperInvokeSite.invoke(InstanceSuperInvokeSite.java:39)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$initialize$0(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:333)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:87)", "org.jruby.RubyClass.newInstance(RubyClass.java:939)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0(/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0$__VARARGS__(/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)", "usr.share.logstash.logstash_minus_core.lib.logstash.agent.RUBY$block$converge_state$2(/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)", "org.jruby.runtime.Block.call(Block.java:139)", "org.jruby.RubyProc.call(RubyProc.java:318)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)", "java.base/java.lang.Thread.run(Thread.java:829)"]}
warning: thread "Converge PipelineAction::Create<hlr_40>" terminated with exception (report_on_exception is true):
LogStash::Error: Don't know how to handle `Java::JavaLang::IllegalStateException` for `PipelineAction::Create<hlr_40>`
          create at org/logstash/execution/ConvergeResultExt.java:135
             add at org/logstash/execution/ConvergeResultExt.java:60
  converge_state at /usr/share/logstash/logstash-core/lib/logstash/agent.rb:396
[ERROR] 2021-12-23 21:04:15.224 [Agent thread] agent - An exception happened when converging configuration {:exception=>LogStash::Error, :message=>"Don't know how to handle `Java::JavaLang::IllegalStateException` for `PipelineAction::Create<hlr_40>`"}
[FATAL] 2021-12-23 21:04:15.246 [LogStash::Runner] runner - An unexpected error occurred! {:error=>#<LogStash::Error: Don't know how to handle `Java::JavaLang::IllegalStateException` for `PipelineAction::Create<hlr_40>`>, :backtrace=>["org/logstash/execution/ConvergeResultExt.java:135:in `create'", "org/logstash/execution/ConvergeResultExt.java:60:in `add'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:396:in `block in converge_state'"]}
[FATAL] 2021-12-23 21:04:15.270 [LogStash::Runner] Logstash - Logstash stopped processing because of an error: (SystemExit) exit
org.jruby.exceptions.SystemExit: (SystemExit) exit
        at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:747) ~[jruby-complete-9.2.20.1.jar:?]
        at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:710) ~[jruby-complete-9.2.20.1.jar:?]
        at usr.share.logstash.lib.bootstrap.environment.<main>(/usr/share/logstash/lib/bootstrap/environment.rb:94) ~[?:?]

how to make timestamp from fields [DATE] [TIME], hmm messages over grok also doesn't parse as it was expected

You have a count as a setting. That's not a valid setting for the file input. Please read the docs

However, if you want to parse multiple files then you can define a pattern of file names:

path => ["/var/log/alog.*","/var/log/adifferentlog/*.log"]

it works but for config "file generator" when I use plugin file, multiline codec have a different behavior

input {
	file {
    mode => read
    path => "/opt/data/input/BC*"
	file_completed_action => "log"
	file_completed_log_path => "/opt/data/logstash_files/fin_log"
    sincedb_path => "/dev/null"
 
 
  codec => multiline {
    pattern => 'ENUM HEADER                          BLOCK   DATE    TIME'
    what => previous
    negate => true
   }

  }

}
{
          "path" => "/opt/data/input/BC*",
       "message" => "ENUM HEADER                          BLOCK   DATE    TIME\r 1184 LDAP FAILURE IN SEARCH RESULT   SDABS   211223  200256700\r SERVID     USERID     REASON                           \r 0          2          32\r MOID                                                   \r \r ATTRBID\r \r",
          "host" => "3d174836d8ac",
      "@version" => "1",
          "tags" => [
        [0] "multiline",
        [1] "_grokparsefailure"
    ],
    "@timestamp" => 2021-12-24T10:17:20.086Z
}
{
          "path" => "/opt/data/input/BC*",
       "message" => "ENUM HEADER                          BLOCK   DATE    TIME\r 1184 LDAP FAILURE IN SEARCH RESULT   SDABS   211223  200256700\r SERVID     USERID     REASON                           \r 0          2          32\r MOID                                                   \r \r ATTRBID\r \r",
          "host" => "3d174836d8ac",
      "@version" => "1",
          "tags" => [
        [0] "multiline",
        [1] "_grokparsefailure"
    ],
    "@timestamp" => 2021-12-24T10:17:20.093Z
}

Change that to DATE%{SPACE}TIME%{SPACE}%{NUMBER:ENUM}%{SPACE}

You may also want start_position => beginning on your file input.