How to Handle Metadata in File Headers

I am using Elasticsearch to collect data from log files. The log files contain metadata in the header relating to the user that logged on. I want to be able to search for events using the metadata in the file header (example below).
UserType: 33883HIJS
AccNo: 939KAKSL892
Version: 4.02.31
Timezone: GMT+05:25

01-02 00:00:13.289 34562 223162 (W)auditd : SELinux: Loaded services from TPD.
01-03 12:07:58.383 36141 423881 (E)rd_tk_renamed: [22,135]

In this example a search for 'UserType: 33883HIJS' would return all two events shown. Is that possible? Can I add these Metadata fields as index fields? How can I parse in this case?

Try something like

    if [message] == "[Metadata]" or [message] == "[Events]" or [message] =~ /^$/ {
        drop {}
    } else {
        if [message] =~ /^[0-9a-zA-Z]+:/ {
            dissect { mapping => { "message" => "%{key}: %{value}" } }
            ruby {
                init => '
                    @@metadata = {}
                code => '
                    @@metadata[event.get("key")] = event.get("value")
            drop {}
        } else {
            ruby {
                code => '
                    event.set("metadata", @@metadata)

Essentially, if the line looks like "key: value" then stash it as metadata. If it does not then add all the stashed metadata items to the event.

I think this requires "--pipeline.workers 1"

This kind of ruby solution tends to be fragile, and has to be tuned to the input.

I use a class variable (@@metadata) rather than an instance variable (@metadata) because we need the same variable to visible across multiple ruby filters.

Thanks for quick response Badger.
Is there a way I can check what values are present in @@metadata? Any particular command I can use for the same?

The second ruby filter adds a metadata field to your event so that you should see them on every document.

   "message" => "01-03 12:07:58.383 36141 423881 (E)rd_tk_renamed: [22,135]",
  "metadata" => {
    "UserType" => "33883HIJS",
       "AccNo" => "939KAKSL892",
    "Timezone" => "GMT+05:25",
     "Version" => "4.02.31"

Thanks Badger. So I combined your code with KV filter and this is what my filter looks like now:

if [message] == "[Metadata]" or [message] == "[Events]" or [message] =~ /^$/
if [message] =~ /^[0-9a-zA-Z]+:/
value_split => ":"
target => "kv"
code => "
hash = event.to_hash
hash.each { |key,value|
event.set(key, value)

    grok {
            break_on_match => false
            match => { "message" => "%{MONTHNUM:Month}-%{MONTHDAY:Day}\s*%{TIME:Timestamp}\s*%{NONNEGINT:PID}\s*%{NUMBER:Thread_id}\s*%{WORD:Severity}\s*(?<Function>(.*?)):\s*%{GREEDYDATA:LogLine}"}
            add_field => ["Received_at", "%{@timestamp}"]
            add_field => ["Received_from", "%{host}"]
           add_field => ["SysVersion", "[kv][SysVersion]"]
           add_field => ["BuildVersion", "[kv][BuildVersion]"]

Can I use kv values in Grok filter like this? Please advise.

That looks like a no-op to me. What are you trying to do there?

Sorry my bad. You were right. Its easier to stash metadata and use it. So changed it as per your earlier suggestion. So my logstash filter looks like this:

if [message] == "[Metadata]" or [message] == "[Events]" or [message] =~ /^$/
} else {
if [message] =~ /^[0-9a-zA-Z]+:/ {
dissect { mapping => { "message" => "%{key}: %{value}" } }
ruby {
init => '
@@metadata = {}
code => '
@@metadata[event.get("key")] = event.get("value")
drop {}
} else {
ruby {
code => '
event.set("metadata", @@metadata)

    grok   {
            break_on_match => false
            match => { "message" => "%{MONTHNUM:Month}-%{MONTHDAY:Day}\s*%{TIME:Timestamp}\s*%{NONNEGINT:PID}\s*%{NUMBER:Thread_id}\s*%{WORD:Severity}\s*(?<Function>(.*?)):\s*%{GREEDYDATA:LogLine}"}
            match => { "message" => "%{MONTHNUM:Month}-%{YEAR:Year}\s*%{TIME:Timestamp}\s*%{WORD:Severity}/(?<Function>(.*?))\s*\((?<POSINT:PID>[^)]*)\):\s*%{GREEDYDATA:LogLine}"}
    mutate {
            remove_field => ["@version","host","beat","tags","offset"]
            add_field => ["Received_at", "%{@timestamp}"]
            add_field => ["Received_from", "%{host}"]


But now problem is: With every new log file, there is new Metadata and this code is not recognizing it. So first file's @@Metadata is copied to all the loglines from all the log files. Is there a way to put condition to see if new file is opened take new metadata?

When writing a post, if you need to include code, or logs, please either precede and follow them with a line containing three backticks (```) or else select the text and click on </> in the toolbar above the edit pane to blockquote the text.

Anyways, yes, you just need to keep a bit more state about whether the filter is processing metadata or events.

if [message] == "[Metadata]" or [message] == "[Events]" or [message] =~ /^$/ {
    drop {}
} else {
    if [message] =~ /^[0-9a-zA-Z]+:/ {
        dissect { mapping => { "message" => "%{key}: %{value}" } }
        ruby {
            init => '
                @@collectingMetadata = false
            code => '
                unless @@collectingMetadata
                    @@metadata = {}
                    @@collectingMetadata = true
                @@metadata[event.get("key")] = event.get("value")
        drop {}
    } else {
        ruby {
            code => '
                @@collectingMetadata = false
                event.set("metadata", @@metadata)

I will definitely keep that in a mind from next time onwards. Again thanks for prompt reply. When I change the code as per your suggestion, it picked up only one metadata value. For example: Just added metadata.Timezone: GMT+05:25. It didn't add UserType and other metadata keys and their values. Any advise?

Not sure. I tested it using

UserType: 33883HIJS
AccNo: 939KAKSL892

01-02 00:00:13.289 34562 223162 (W)auditd : SELinux: Loaded services from TPD.

Version: 4.02.31
Timezone: GMT+05:25

01-03 12:07:58.383 36141 423881 (E)rd_tk_renamed: [22,135]

and I got

      "metadata" => {
           "AccNo" => "939KAKSL892",
        "UserType" => "33883HIJS"
       "message" => "01-02 00:00:13.289 34562 223162 (W)auditd : SELinux: Loaded services from TPD."
      "metadata" => {
         "Version" => "4.02.31",
        "Timezone" => "GMT+05:25"
       "message" => "01-03 12:07:58.383 36141 423881 (E)rd_tk_renamed: [22,135]"

Don't make me guess what your input looks like. Show me.

So basically I have 3 different types of log files to parse. Here is the input for each file:

File x.log


DType: XYZ
SNumber: SSKD293
BuildType: debug
Tags: external
state: e
STime: 190106
ETime: 190108
LType: et
Reason: 154659DED7
Timezone: GMT+08:20
Utc: P280KSL01

01-07 11:01:51.501 3602 3616 I start: LogUpload
01-07 11:01:51.509 3602 10960 I end : ,empty

File y.log


DType: XYZ
SNumber: SSKD293
BuildType: debug
Tags: external
state: e
STime: 190106
ETime: 190108
LType: kt
Reason: 154659DED7
Timezone: GMT+08:25
Utc: P280KSL01

01-01 00:00:12.183 24240 42420 W [349@1]: added new subscriber
01-01 05:30:12.183 30124 0424 W [484@1]: startingservice!
01-01 05:30:12.183 43420 88870 W [0@1]: added vitals

File z.log

DType: XYZ
SNumber: SSKD293
BuildType: debug
Tags: external
state: e
STime: 190106
ETime: 190108
LType: mt
Reason: 154659DED7
Timezone: GMT+08:30
Utc: P280KSL00

01-04 09:12:07.517 4890 16256 I trackerService: check if it remain in the queue
01-04 09:12:07.519 4890 16256 W trackerServiceteDelegate: Insert details in table

So File x.log's [Metadata] should be applied to x.log's [Events] and same for others.
When I tried same script now again it gave me one metadata value (metadata.Timezone) for all three files. How can I get all the metadata values?

Again thanks for all your help. Really appreciate it.

OK, so I created those three logs in /tmp/a and used this input

input { file { path => "/tmp/a/*.log" sincedb_path => "/dev/null" start_position => "beginning" } }

The SystemVersion metadata does not have a space after the colon, so I had to switch from dissect to a grok filter.

filter {
if [message] == "[Metadata]" or [message] == "[Events]" or [message] =~ /^$/ {
    drop {}
} else {
    if [message] =~ /^[0-9a-zA-Z]+:/ {
        grok { match => [ "message", "^(?<key>[^:]+):\s*%{GREEDYDATA:value}" ] }
        ruby {
            init => '
                @@collectingMetadata = false
            code => '
                unless @@collectingMetadata
                    @@metadata = {}
                    @@collectingMetadata = true
                @@metadata[event.get("key")] = event.get("value")
        drop {}
    } else {
        ruby {
            code => '
                @@collectingMetadata = false
                event.set("metadata", @@metadata)

That works just fine for me. Did you set --pipeline.workers 1 ? It actually works for me with more than one worker, but I would not expect it to.

  "metadata" => {
              "Utc" => "P280KSL00",
         "Timezone" => "GMT+08:30",
            "STime" => "190106",
        "BuildType" => "debug",
             "Tags" => "external",
            "state" => "e",
            "DType" => "XYZ",
            "ETime" => "190108",
            "LType" => "mt",
           "Reason" => "154659DED7",
    "SystemVersion" => "1290N",
          "SNumber" => "SSKD293"
   "message" => "01-04 09:12:07.519 4890 16256 W trackerServiceteDelegate: Insert details in table",
      "path" => "/tmp/a/z.log"

This is perfect...I changed it to grok and it started working for me too. Thanks for all the help.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.