How to Handle Metadata in File Headers


(Deepti) #1

I am using Elasticsearch to collect data from log files. The log files contain metadata in the header relating to the user that logged on. I want to be able to search for events using the metadata in the file header (example below).
[Metadata]
UserType: 33883HIJS
AccNo: 939KAKSL892
Version: 4.02.31
Timezone: GMT+05:25

[Events]
01-02 00:00:13.289 34562 223162 (W)auditd : SELinux: Loaded services from TPD.
01-03 12:07:58.383 36141 423881 (E)rd_tk_renamed: [22,135]

In this example a search for 'UserType: 33883HIJS' would return all two events shown. Is that possible? Can I add these Metadata fields as index fields? How can I parse in this case?


#2

Try something like

    if [message] == "[Metadata]" or [message] == "[Events]" or [message] =~ /^$/ {
        drop {}
    } else {
        if [message] =~ /^[0-9a-zA-Z]+:/ {
            dissect { mapping => { "message" => "%{key}: %{value}" } }
            ruby {
                init => '
                    @@metadata = {}
                '
                code => '
                    @@metadata[event.get("key")] = event.get("value")
                '
            }
            drop {}
        } else {
            ruby {
                code => '
                    event.set("metadata", @@metadata)
                '
            }
        }
    }

Essentially, if the line looks like "key: value" then stash it as metadata. If it does not then add all the stashed metadata items to the event.

I think this requires "--pipeline.workers 1"

This kind of ruby solution tends to be fragile, and has to be tuned to the input.

I use a class variable (@@metadata) rather than an instance variable (@metadata) because we need the same variable to visible across multiple ruby filters.


(Deepti) #3

Thanks for quick response Badger.
Is there a way I can check what values are present in @@metadata? Any particular command I can use for the same?


#4

The second ruby filter adds a metadata field to your event so that you should see them on every document.

   "message" => "01-03 12:07:58.383 36141 423881 (E)rd_tk_renamed: [22,135]",
  "metadata" => {
    "UserType" => "33883HIJS",
       "AccNo" => "939KAKSL892",
    "Timezone" => "GMT+05:25",
     "Version" => "4.02.31"
}

(Deepti) #5

Thanks Badger. So I combined your code with KV filter and this is what my filter looks like now:

filter{
if [message] == "[Metadata]" or [message] == "[Events]" or [message] =~ /^$/
{
drop{}
}
if [message] =~ /^[0-9a-zA-Z]+:/
{
kv{
value_split => ":"
target => "kv"
}
ruby{
code => "
hash = event.to_hash
hash.each { |key,value|
event.set(key, value)
}
"
}

    }
    grok {
            break_on_match => false
            match => { "message" => "%{MONTHNUM:Month}-%{MONTHDAY:Day}\s*%{TIME:Timestamp}\s*%{NONNEGINT:PID}\s*%{NUMBER:Thread_id}\s*%{WORD:Severity}\s*(?<Function>(.*?)):\s*%{GREEDYDATA:LogLine}"}
            add_field => ["Received_at", "%{@timestamp}"]
            add_field => ["Received_from", "%{host}"]
           add_field => ["SysVersion", "[kv][SysVersion]"]
           add_field => ["BuildVersion", "[kv][BuildVersion]"]
     }

Can I use kv values in Grok filter like this? Please advise.


#6

That looks like a no-op to me. What are you trying to do there?


(Deepti) #7

Sorry my bad. You were right. Its easier to stash metadata and use it. So changed it as per your earlier suggestion. So my logstash filter looks like this:

filter{
if [message] == "[Metadata]" or [message] == "[Events]" or [message] =~ /^$/
{
drop{}
} else {
if [message] =~ /^[0-9a-zA-Z]+:/ {
dissect { mapping => { "message" => "%{key}: %{value}" } }
ruby {
init => '
@@metadata = {}
'
code => '
@@metadata[event.get("key")] = event.get("value")
'
}
drop {}
} else {
ruby {
code => '
event.set("metadata", @@metadata)
'
}
}
}

    grok   {
            break_on_match => false
            match => { "message" => "%{MONTHNUM:Month}-%{MONTHDAY:Day}\s*%{TIME:Timestamp}\s*%{NONNEGINT:PID}\s*%{NUMBER:Thread_id}\s*%{WORD:Severity}\s*(?<Function>(.*?)):\s*%{GREEDYDATA:LogLine}"}
            match => { "message" => "%{MONTHNUM:Month}-%{YEAR:Year}\s*%{TIME:Timestamp}\s*%{WORD:Severity}/(?<Function>(.*?))\s*\((?<POSINT:PID>[^)]*)\):\s*%{GREEDYDATA:LogLine}"}
           }
    mutate {
            remove_field => ["@version","host","beat","tags","offset"]
            add_field => ["Received_at", "%{@timestamp}"]
            add_field => ["Received_from", "%{host}"]
           }

}

But now problem is: With every new log file, there is new Metadata and this code is not recognizing it. So first file's @@Metadata is copied to all the loglines from all the log files. Is there a way to put condition to see if new file is opened take new metadata?


#8

When writing a post, if you need to include code, or logs, please either precede and follow them with a line containing three backticks (```) or else select the text and click on </> in the toolbar above the edit pane to blockquote the text.

Anyways, yes, you just need to keep a bit more state about whether the filter is processing metadata or events.

if [message] == "[Metadata]" or [message] == "[Events]" or [message] =~ /^$/ {
    drop {}
} else {
    if [message] =~ /^[0-9a-zA-Z]+:/ {
        dissect { mapping => { "message" => "%{key}: %{value}" } }
        ruby {
            init => '
                @@collectingMetadata = false
            '
            code => '
                unless @@collectingMetadata
                    @@metadata = {}
                    @@collectingMetadata = true
                end
                @@metadata[event.get("key")] = event.get("value")
            '
        }
        drop {}
    } else {
        ruby {
            code => '
                @@collectingMetadata = false
                event.set("metadata", @@metadata)
            '
        }
    }
}

(Deepti) #9

I will definitely keep that in a mind from next time onwards. Again thanks for prompt reply. When I change the code as per your suggestion, it picked up only one metadata value. For example: Just added metadata.Timezone: GMT+05:25. It didn't add UserType and other metadata keys and their values. Any advise?


#10

Not sure. I tested it using

[Metadata]
UserType: 33883HIJS
AccNo: 939KAKSL892

[Events]
01-02 00:00:13.289 34562 223162 (W)auditd : SELinux: Loaded services from TPD.

[Metadata]
Version: 4.02.31
Timezone: GMT+05:25

[Events]
01-03 12:07:58.383 36141 423881 (E)rd_tk_renamed: [22,135]

and I got

{
      "metadata" => {
           "AccNo" => "939KAKSL892",
        "UserType" => "33883HIJS"
    },
       "message" => "01-02 00:00:13.289 34562 223162 (W)auditd : SELinux: Loaded services from TPD."
}
{
      "metadata" => {
         "Version" => "4.02.31",
        "Timezone" => "GMT+05:25"
    },
       "message" => "01-03 12:07:58.383 36141 423881 (E)rd_tk_renamed: [22,135]"
}

Don't make me guess what your input looks like. Show me.


(Deepti) #11

So basically I have 3 different types of log files to parse. Here is the input for each file:

File x.log

[Metadata]

DType: XYZ
SNumber: SSKD293
SystemVersion:1290N
BuildType: debug
Tags: external
state: e
STime: 190106
ETime: 190108
LType: et
Reason: 154659DED7
Timezone: GMT+08:20
Utc: P280KSL01

[Events]
01-07 11:01:51.501 3602 3616 I start: LogUpload
01-07 11:01:51.509 3602 10960 I end : ,empty

File y.log

[Metadata]

DType: XYZ
SNumber: SSKD293
SystemVersion:1290N
BuildType: debug
Tags: external
state: e
STime: 190106
ETime: 190108
LType: kt
Reason: 154659DED7
Timezone: GMT+08:25
Utc: P280KSL01

[Events]
01-01 00:00:12.183 24240 42420 W [349@1]: added new subscriber
01-01 05:30:12.183 30124 0424 W [484@1]: startingservice!
01-01 05:30:12.183 43420 88870 W [0@1]: added vitals

File z.log

[Metadata]
DType: XYZ
SNumber: SSKD293
SystemVersion:1290N
BuildType: debug
Tags: external
state: e
STime: 190106
ETime: 190108
LType: mt
Reason: 154659DED7
Timezone: GMT+08:30
Utc: P280KSL00

[Events]
01-04 09:12:07.517 4890 16256 I trackerService: check if it remain in the queue
01-04 09:12:07.519 4890 16256 W trackerServiceteDelegate: Insert details in table

So File x.log's [Metadata] should be applied to x.log's [Events] and same for others.
When I tried same script now again it gave me one metadata value (metadata.Timezone) for all three files. How can I get all the metadata values?

Again thanks for all your help. Really appreciate it.


#12

OK, so I created those three logs in /tmp/a and used this input

input { file { path => "/tmp/a/*.log" sincedb_path => "/dev/null" start_position => "beginning" } }

The SystemVersion metadata does not have a space after the colon, so I had to switch from dissect to a grok filter.

filter {
if [message] == "[Metadata]" or [message] == "[Events]" or [message] =~ /^$/ {
    drop {}
} else {
    if [message] =~ /^[0-9a-zA-Z]+:/ {
        grok { match => [ "message", "^(?<key>[^:]+):\s*%{GREEDYDATA:value}" ] }
        ruby {
            init => '
                @@collectingMetadata = false
            '
            code => '
                unless @@collectingMetadata
                    @@metadata = {}
                    @@collectingMetadata = true
                end
                @@metadata[event.get("key")] = event.get("value")
            '
        }
        drop {}
    } else {
        ruby {
            code => '
                @@collectingMetadata = false
                event.set("metadata", @@metadata)
            '
        }
    }
}
}

That works just fine for me. Did you set --pipeline.workers 1 ? It actually works for me with more than one worker, but I would not expect it to.

  "metadata" => {
              "Utc" => "P280KSL00",
         "Timezone" => "GMT+08:30",
            "STime" => "190106",
        "BuildType" => "debug",
             "Tags" => "external",
            "state" => "e",
            "DType" => "XYZ",
            "ETime" => "190108",
            "LType" => "mt",
           "Reason" => "154659DED7",
    "SystemVersion" => "1290N",
          "SNumber" => "SSKD293"
},
   "message" => "01-04 09:12:07.519 4890 16256 W trackerServiceteDelegate: Insert details in table",
      "path" => "/tmp/a/z.log"

(Deepti) #13

This is perfect...I changed it to grok and it started working for me too. Thanks for all the help.