Adding to a CSV file data-set some metadata from a Json file

Hello,

I am trying to do the following:

what I have

  • I have a csv file that contains some a data-set LG_data.csv
OWNER;CATEGORY;TASK;ID;TEN_ID;CHANNEL_ID;CODE;PLANNED_START;TIME_STORED;TIME_QUEUED;TIME_STARTED;TIME_DONE;RUNTIME;WAIT_TIME;DURATION
loadtestnonxa_res;;CW_SCHEDULED_MESSAGE;1520365718375;1;1;BOOTSTRAP;2018-03-06 19:48:38.375;2018-03-06 19:48:38.375;2018-03-06 19:48:38.748;2018-03-06 19:48:38.796;2018-03-06 19:48:47.513;8717;421;9138
loadtestnonxa_res;;CW_SCHEDULED_MESSAGE;1520365718376;1;2;BOOTSTRAP;2018-03-06 19:48:40.495;2018-03-06 19:48:40.495;2018-03-06 19:48:40.793;2018-03-06 19:48:40.819;2018-03-06 19:48:49.883;9064;324;9388
  • I have a json file that contains metadata for this data-set LG_data_metadata.json
{
  "teamcity.build.id": "9182737475",
  "env.loadtest-usecases.profile": "run-uc1-small",
  "project-config.version": "6.3.0-SNAPSHOT",
  "loadtest-usecases.branch.name": "feature/release-6.3.0"
}

what I'm trying to do

After reading the csv data, I use grok to match the filename and use it in the next input/file parser to read the json file (name convention).

Then I use grok again to match the metadata in the json file and add it (mutate) as a field.

what I expect

  • each data-item has the added field with the corresponding value of the match field

Here is my configuration:

input {
   file {
      path => "/devevelop/scratch/data/*.csv"
      type => "csv"
      start_position => "beginning"    
      sincedb_path => "/dev/null"      
  }
 }

filter{
    grok{
      match => ["path","/devevelop/scratch/data/%{GREEDYDATA:filename}.csv"]
    }
}

input {
  file {
      path => "/devevelop/scratch/data/%{filename}_metadata.json"
      type => "json"
      start_position => "beginning"    
      sincedb_path => "/dev/null" 
        codec => multiline {
        pattern => '.*'
        negate => true
        what => previous
    }
  }
}

filter {
  if [type]== "json" {
      grok {
          match =>  {"message" => "\"teamcity.build.id\":%{GREEDYDATA:teamcity.build.id}"}
          match =>  {"message" => "\"env.loadtest-usecases.profile\":%{GREEDYDATA:env.loadtest-usecases.profile}"}
          match =>  {"message" => "\"project-config.version\":%{GREEDYDATA:project-config.version}"}
          match =>  {"message" => "\"loadtest-usecases.branch.name\":%{GREEDYDATA:loadtest-usecases.branch.name}"}
      }
      if "_grokparsefailure" in [tags] {
        drop { }
      }
  }
  mutate {
    add_field => { "teamcity.build.id" => "%{teamcity.build.id}" }
  }
}

filter {
  if [type]== "csv" {
    csv {        
        separator => ";"     
        columns => ["OWNER","CATEGORY","TASK","ID","TEN_ID","CHANNEL_ID","CODE","PLANNED_START","TIME_STORED","TIME_QUEUED","TIME_STARTED","TIME_DONE","RUNTIME","WAIT_TIME","DURATION", "RELEASE"]
        convert => {
            "PLANNED_START" => "date_time"
            "TIME_STORED" => "date_time"
            "TIME_QUEUED" => "date_time"
            "TIME_STARTED" => "date_time"
            "TIME_DONE" => "date_time"
            "DURATION" => "integer"
            "WAIT_TIME" => "integer"
            "RUNTIME" => "integer"         
      }
    }
  }
}

output {
  stdout { 
       codec => rubydebug 
    }
}

But I get:

...
{
              "message" => "loadtestnonxa_res;;CW_SCHEDULED_MESSAGE;1520365718377;1;3;BOOTSTRAP;2018-03-06 19:48:42.587;2018-03-06 19:48:42.587;2018-03-06 19:48:42.826;2018-03-06 19:48:42.882;2018-03-06 19:48:53.503;10621;295;10916",
             "@version" => "1",
           "@timestamp" => "2018-03-29T15:35:23.388Z",
                 "path" => "/devevelop/scratch/data/LG_data.csv",
                 "host" => "elGuapo",
                 "type" => "csv",
             "filename" => "LG_data",
    "teamcity.build.id" => "%{teamcity.build.id}",
                "OWNER" => "loadtestnonxa_res",
             "CATEGORY" => nil,
                 "TASK" => "CW_SCHEDULED_MESSAGE",
                   "ID" => "1520365718377",
               "TEN_ID" => "1",
           "CHANNEL_ID" => "3",
                 "CODE" => "BOOTSTRAP",
        "PLANNED_START" => "2018-03-06 19:48:42.587",
          "TIME_STORED" => "2018-03-06 19:48:42.587",
          "TIME_QUEUED" => "2018-03-06 19:48:42.826",
         "TIME_STARTED" => "2018-03-06 19:48:42.882",
            "TIME_DONE" => "2018-03-06 19:48:53.503",
              "RUNTIME" => 10621,
            "WAIT_TIME" => 295,
             "DURATION" => 10916
}
... 

Problem

  1. The field teamcity.build.id is not resolved to the expected value. (it works for filename)

The json part works. I trued it in a single file configuration:

input {
  stdin {
      codec => multiline {
        pattern => '.*'
        negate => true
        what => previous
    }
  }
}

filter {
    grok {
        match =>  {"message" => "\"teamcity.build.id\":%{GREEDYDATA:teamcity.build.id}"}
        match =>  {"message" => "\"env.loadtest-usecases.profile\":%{GREEDYDATA:env.loadtest-usecases.profile}"}
        match =>  {"message" => "\"project-config.version\":%{GREEDYDATA:project-config.version}"}
        match =>  {"message" => "\"loadtest-usecases.branch.name\":%{GREEDYDATA:loadtest-usecases.branch.name}"}
    }
    if "_grokparsefailure" in [tags] {
      drop { }
    }
}

output {
  stdout { codec => rubydebug }
}

And the result is:

{
           "@timestamp" => "2018-03-29T15:50:55.857Z",
              "message" => "  \"teamcity.build.id\": \"9182737475\",",
             "@version" => "1",
                 "host" => "elGuapo",
    "teamcity.build.id" => " \"9182737475\","
}
{
                       "@timestamp" => "2018-03-29T15:50:55.857Z",
                          "message" => "  \"env.loadtest-usecases.profile\": \"run-uc1-small\",",
                         "@version" => "1",
                             "host" => "elGuapo",
    "env.loadtest-usecases.profile" => " \"run-uc1-small\","
}
{
                "@timestamp" => "2018-03-29T15:50:55.858Z",
                   "message" => "  \"project-config.version\": \"6.3.0-SNAPSHOT\",",
                  "@version" => "1",
                      "host" => "elGuapo",
    "project-config.version" => " \"6.3.0-SNAPSHOT\","
}
{
                       "@timestamp" => "2018-03-29T15:50:55.858Z",
                          "message" => "  \"loadtest-usecases.branch.name\": \"feature/release-6.3.0\"",
                         "@version" => "1",
                             "host" => "elGuapo",
    "loadtest-usecases.branch.name" => " \"feature/release-6.3.0\""
}

Questions

  1. Can I use a grok match as part of the file path?
  2. If yes, shouldn't it be able to access the value of the fields when I add it to every data-item of the next filter (like with filename)?
  3. What am I doing wrong?

Within a Logstash pipeline, all inputs are used to generate events, which are fed through all filters to outputs; we cannot have inputs that read from filters.

  • How are your json metadata being provided? Are they relatively static?
  • Once a csv file is written, does the JSON metadata change?
  • Are the CSV files appended to, or are they written all-at-once?

Ok, I get it. That is not going to work.

  • The metadata is provided as a separate file during a test run. It is for that run static.
  • The CSV files are the samples of a test run. It can be mapped to a static ID (test-run-id (build-id)).
  • The CSV files are written all at once for a specific test-run.

I understand now why this will not work. I think I will stich the build-id to the CSV file as filename, and make it a field before loading it into ES.

And then write a separate filter for the metadata with the same build-id.

I am sure there is some way of correlating that information in ES after import. I just have to find that out. I'm not very fluent in the ES-Stack.

At the end, I would like to make a graph showing all samples for example for DURATION (separated by build-id) of a specific profile (from the metadata like run-uc1-small).

Any hints or ideas on how to do that are welcome (before I fall another rabbit hole).

Thanks for your answer though.

Would it be possible to inject the metadata into a database before placing the CSV file? If so, the JDBC Streaming filter may be a way to inject the data mid-stream, and it has a LRU cache built-in, so it wouldn't hammer the database too hard when retrieving metadata for the same job ID repeatedly.

I do see a JDBC driver for SQLite which could be a good option if you don't want to run a database server just for this.

Thanks. That looks like a good idea. I will look into it. However I am currently checking if I can use a filtered query after having both (CSV and Metadata) in ES as separate datasets. I'll keep this post updated on success/failure.

Thanks to the idea of @yaauie I came to a solution (more like a work around) since I am not merging to files anymore. The metadata is taken directly from the REST-API (instead of a jdbc-filter) of TeamCity via the buildid. My test filter looks like this (the csv file is pipe to the stdin and the buildid is already in the event:

input {
  stdin {}
}


filter {
  csv {
      skip_empty_rows => true
      skip_header => true
      separator => ";"     
      columns => ["OWNER","CATEGORY","TASK","ID","TEN_ID","CHANNEL_ID","CODE","PLANNED_START","TIME_STORED","TIME_QUEUED","TIME_STARTED","TIME_DONE","RUNTIME","WAIT_TIME","DURATION", "RELEASE", "BUILDID"]
      convert => {
          "PLANNED_START" => "date_time"
          "TIME_STORED" => "date_time"
          "TIME_QUEUED" => "date_time"
          "TIME_STARTED" => "date_time"
          "TIME_DONE" => "date_time"
          "DURATION" => "integer"
          "WAIT_TIME" => "integer"
          "RUNTIME" => "integer"        
    }    
   
  }

 date {
    match => ["PLANNED_START", "yyyy-MM-dd HH:mm:ss.SSS"]
    target => "@start"
  }
 date {
    match => ["TIME_DONE", "yyyy-MM-dd HH:mm:ss.SSS"]
    target => "@done"
  }
}

filter { 
    rest { 
        request => { 
            url => "http://teamcity.mobilex.intra/app/rest/builds/%{BUILDID}" 
            method => "get" 
            headers => { 
                "Content-Type" => "application/json" 
                "Accept" => "application/json"
                }                         
            auth => {
                    user => "****"
                    password => "****"
            }
            params => {
                "fields" => "id,status,state,branchName,triggered(user(name)),startDate,finishDate,properties(property(name,value))"
            } 
        }        
        json => true 
        target => 'metadata' 
    } 
} 

filter {
    mutate {
        add_field => {
            "metadata.branchName" => "%{[metadata][branchName]}"                      
        }        
    }
    ruby {
        path => "./map_properties.rb"
    }
    
    mutate {
        remove_field => "metadata"
    }
}

output {
    stdout { 
        codec => rubydebug 
    }
}

I had to make use os the rest-filter (which is warned as not very good supported, but then .. open source) and write a ruby-filter because the json-filter can not handle arrays with key/value pairs very well:

def filter(event)
  properties = event.get("[metadata][properties][property]")
  properties.each do |x|
    if (x["value"] != nil )
      event.set("metadata." + x["name"], x["value"])
    end
  end
  return [event]
end

Which works for me so far.

So we can close this one as far as I'm concerned.