Creating and using variables across the logstash config


(Muhamadli302) #1

Hi,

I'm using LS 2.2, reading a CSV of events.
I want it to be completely automatic, so in order to create a meaningful index name, I tried to "implant" the name I want to use, and then parse it and use it in the output. Here is my config file:

`file {
    beats {
        port => 5044
    }
}
filter {
   csv {
        separator => ","
        columns => ["column1", "column2".....]
    }
    if [column1] == "index_name" {
        grok { match => {"column2" => ["%{DATA:index_name}"]}}
    }
}
# more filters that are not related...
output {
    elasticsearch {
        index => "%{index_name}"
        hosts => [array of ips]
    }
}`

What happened with this config is that I got an index with the name %{index_name}, so clearly I did something wrong.
Just to clarify, the info I implanted inside the csv is just a line that goes like this:
index_name,ed-info
(where ed-info is the actual name I want to give the index...)

Thank you for your help!


(Magnus Bäck) #2

The index_name field was apparently never set. Please show an example message (use a stdout { codec => rubydebug } output).


(Muhamadli302) #3
{
    "@version" => "1",
    "@timestamp" => "2016-11-27T13:38:40.919Z",
    "beat" => {
        "hostname" => "CNC",
       "name" => "CNC"
    },
    "count" => "1",
    "fields" => nil,
    "input_type" => "log",
    "offset" => 8738,
    "source" => "C:\\logs\\beats_test.csv",
    "type" => "log",
    "host" => "CNC",
    "tags" => [
    [0] "_dataparsefailure"
    ],
    "extracted.ext" => nil
}

(Magnus Bäck) #4

Indeed, there is no index_name field. Unless you tell us what an input event looks like (i.e. what data your filters are trying to process) and what other filters you have it's very hard to help.


(Muhamadli302) #5

Hi,

I'm using ELK-Forensics (https://github.com/cvandeplas/ELK-forensics) with a few minor changes:

  input {
  tcp {
    type => "l2tcsv"
    port => 18005
  }
}

filter {
  if [type] == "l2tcsv" {
    csv { 
       separator => ","
       quote_char => "ª"       # workaround: don't use a quote character as " gives issues if the field contains a "
       columns => ["date","time","timezone","macb","source","sourcetype","eventtype","user","host","short","desc","version","filename","inode","notes","format","extra"]
    }
    if [date] == "date" {
       drop {}  # drop the first line that contains the column names
    }
 if [date] == "index_name" {
        grok { match => {"time" => ["%{DATA:index_name}"]}}
    }
    mutate { merge => ["date", "time"] }       # merge and join need to be in separate mutates
    mutate { merge => ["date", "timezone"] }   # merge and join need to be in separate mutates
    mutate { join => ["date", " "] }           # merge and join need to be in separate mutates
    date { 
      match => ["date", "MM/dd/YYYY HH:mm:ss z" ] 
    }

    # extract macb info
    if ("M" in [macb]) { mutate { add_tag => ["modified"] } }
    if ("A" in [macb]) { mutate { add_tag => ["accessed"] } }
    if ("C" in [macb]) { mutate { add_tag => ["changed"] } }
    if ("B" in [macb]) { mutate { add_tag => ["birth"] } }
    
    # Extract filenames
    if [source] == "FILE" {
      grok { 
        break_on_match => false
        match => ["desc", "(:(?<extracted.path>/.*?))?$",
                  "extracted.path", "(?<extracted.filename>[^/]+?)?$",
                  "extracted.filename", "((\.(?<extracted.ext>[^./]+))?)?$" 
                 ] 
      }
    }
    if [source] == "META" {
      grok { 
        break_on_match => false
        match => ["filename", "(:(?<extracted.path>/.*?))?$",
                  "extracted.path", "(?<extracted.filename>[^/]+?)?$",
                  "extracted.filename", "((\.(?<extracted.ext>[^./]+))?)?$" 
                 ] 
      }
    }
    # Extract urls
    if [source] == "WEBHIST" {
      grok { match => ["desc", "Location: (?<extracted.url>.*?)[ $]"] }
    }
    mutate {
      convert => ["inode", "integer",
                  "version", "integer"] 
      lowercase => ["extracted.ext"]
      remove_field => ["message", "short", "date", "time", "timezone"]
    }
  }
}

output { 
  if [type] == "l2tcsv" {
    elasticsearch {
      index => "%{index_name}"
      hosts =>[array of ips]
    }
  }
}

The data I'm processing is Psort CSV output (https://github.com/log2timeline/plaso/wiki), and I before I'm sending it to Logstash, I'm adding the line I described before at the beginning of the document.

If there's any other information you need, please let me know.
Thank you for your help!


(Magnus Bäck) #6

I asked for two things, an example input event and your filter configuration. You have so far only provided your filter configuration.

What I can say so far is that the index_name field will only be populated if the first column of the CSV is "index_name" and the event type is 12tcsv. That doesn't appear to be the case. In the sample output event you provided earlier the type is "logs", but I also note that that event entered Logstash via a beats input, but there's no beats input in the configuration you posted.


(Muhamadli302) #7

I'm sorry, I didn't notice I posted the wrong version of the logstash config.
I will make some order and transfer you the correct configs and input.


(system) #8

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.