Denormalizing with filter in Logstash

Hello dear community,

it's nice to be able to post my first topic.
So I just started to work with Elasticsearch. Thus I'm very inexperienced and after some time I sadly was not able to find a solution by myself.
I need help with setting up my Logstash filter, please.
Basically, the goal is to load up Excel data into Elasticsearch. Most commonly the person that created the Excel table is structuring the data in this form or when loading such data from "worldbank.io" the structure is similar.

image

Uploading this via Logstash and with no filters, because I'm not yet familiar how to use them for good, is not really useful for me, especially when I want to visualize with split series in Kibana. This is the .conf file:

The solution I found is to structure the Excel data in the following form:

This yielded the wanted result and when uploading with the default settings and the mapping seemed fine. I used this .conf file (the column "Einwohner" means "resident" to clear up the discrepancy between the tables and .conf file):

Since I don't want to re-format each Excel table by hand, the question is: How am I able to set up my filter in the .conf file, that the data gets automatically structured in this way?
Thanks in advance to all!
Best regards
Tim

PS: below the resulting mappings and please let me know if more information is needed:

If you want to split an event into multiple events you would use a split filter, which means you need to construct an array containing the data you want. If you have data like

A,1,2,3,4
B,5,6,7,8

You could try something like

    csv { columns => [ "Id", "2019", "2020", "2021", "2022" ] }
    ruby {
        code => '
            a = []
            [ "2019", "2020", "2021", "2022" ].each { |x|
                a << { "year" => x, "residents" => event.get(x) }
            }
            event.set("a", a)
        '
    }
    split { field => "a" }
    mutate { rename => { "[a][residents]" => "residents" "[a][year]" => "year" } remove_field => [ "a" ] }

Hi Badger,

first of all thank you very much and happy new year!
So I commented my thought process in your code snipped, since i was not able to fully deconstruct it.

csv { columns => [ "Id", "2019", "2020", "2021", "2022" ] } 
// here i need to list every column in my .csv file?

    ruby {        /* So this simply means that the following part is written and 
executed in ruby? Do I need to install ruby then and 
are other languages possible for this operation? */

        code => '
            a = [] 			//simply initializing an array, named a right?
            [ "2019", "2020", "2021", "2022" ].each 	
/*I suppose here I define the columns from the .csv where the data is getting from into the array?
and loop through them?*/

/*I am not quite sure what x does belong to. Am i right to assume that x is getting 
initialized with a new column names "year" and is matching it with the 
corresponding "residents" field? Something like a VLOOKUP in Excel? */
{ |x|  << { "year" => x, "residents" => event.get(x) }
            }
            event.set("a", a) 
/*I do not understand why this function is taking "a" and a? 
When a is the array, what does "a" hold as values? */
        '
    }
    split { field => "a" } //Splitting the array to fit in JSON but not quite sure how exactly.


//to make things clearer could you please sketch out a head of the array structure we created? 
    mutate { rename => { "[a][residents]" => "residents" "[a][year]" => "year" } remove field => [ "a" ] }

Best regards
Tim

    # ruby is a filter that is bundled with logstash by default. It allows you to
    # run arbitrary ruby code
    ruby { 
        code => '
            # This creates an empty array called [a]
            a = []
            # .each iterates over the array that contains the field names and
            # executes the code block with each value in turn. Thus the first
            # time the code block executes x will be "2019", the second time
            # it will be "2020" etc. The array should contain your column names.
            [ "2019", "2020", "2021", "2022" ].each { |x|
                # The << operator appends a value at the end of the array.
                # The value it appends will be a hash in which year is 
                # (e.g.) "2019" andresidents will be the value of the "2019" 
                # field on the event (so 9006022 for the first row of data in
                # your example). It is a hash because it is surrounded by {}
                a << { "year" => x, "residents" => event.get(x) }
            }
            # This creates a field on the event which contains the array
            # of four hashes, one for each year.
            # The name "a" is arbitrary, since the field gets removed later.
            # It has to be called something so that we can split it.
            event.set("a", a)
        '
    }
    # This splits the event into four events, each of which has one
    # of the members of the array of hashes.
    split { field => "a" }
    # This moves the residents and year to the top level of the event,
    # allowing us to remove the temporary holding place they were in.
    mutate { rename => { "[a][residents]" => "residents" "[a][year]" => "year" } remove_field => [ "a" ] }

Hi Badger,

thank you very much again!
Your great explanation makes it a lot more clear.
I am getting the code is mostly working.
There seems to be only a minor issue.

input {
  file {
    path => "/home/estest/csv-data/Einwohner_test.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  csv {
      separator => ";"	  
      skip_header => "true"
      columns => ["LandID","LandName","RegionID","RegionName","2019","2020","2025","2030"]
  }

  ruby {
        code => '
            a = []
            [ "2019", "2020", "2025", "2030" ].each { |x|
                a << { "year" => x, "residents" => event.get(x) }
            }
            event.set("a", a)
        '
    }
    split { field => "a" }
    mutate { rename => { "[a][residents]" => "residents" "[a][year]" => "year" } 
             remove_field => [ "a" ]
             convert => {residents => "integer" year => "integer"}
         }
	
	
}
output {
   elasticsearch {
     hosts => "http://localhost:9200"
     index => "demo-db-split"
  }

stdout {}

}

As you can see for each entry the year and residents pair is correct but the other years are not getting dropped. Do I have to use some sort of conditional drop like
if field 20* then drop?

OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jruby.ext.openssl.SecurityHelper (file:/tmp/jruby-2121/jruby9617472003062199941jopenssl.jar) to field java.security.MessageDigest.provider
WARNING: Please consider reporting this to the maintainers of org.jruby.ext.openssl.SecurityHelper
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[INFO ] 2021-01-05 09:23:02.695 [main] runner - Starting Logstash {"logstash.version"=>"7.10.1", "jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 11.0.8+10 on 11.0.8+10 +indy +jit [linux-x86_64]"}
[WARN ] 2021-01-05 09:23:03.550 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2021-01-05 09:23:11.230 [Converge PipelineAction::Create<main>] Reflections - Reflections took 163 ms to scan 1 urls, producing 23 keys and 47 values
[WARN ] 2021-01-05 09:23:12.603 [Converge PipelineAction::Create<main>] elasticsearch - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[INFO ] 2021-01-05 09:23:13.920 [[main]-pipeline-manager] elasticsearch - Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[WARN ] 2021-01-05 09:23:14.350 [[main]-pipeline-manager] elasticsearch - Restored connection to ES instance {:url=>"http://localhost:9200/"}
[INFO ] 2021-01-05 09:23:14.699 [[main]-pipeline-manager] elasticsearch - ES Output version determined {:es_version=>7}
[WARN ] 2021-01-05 09:23:14.701 [[main]-pipeline-manager] elasticsearch - Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[INFO ] 2021-01-05 09:23:14.818 [[main]-pipeline-manager] elasticsearch - New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://localhost:9200"]}
[INFO ] 2021-01-05 09:23:14.975 [Ruby-0-Thread-5: :1] elasticsearch - Using a default mapping template {:es_version=>7, :ecs_compatibility=>:disabled}
[INFO ] 2021-01-05 09:23:15.121 [Ruby-0-Thread-5: :1] elasticsearch - Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[INFO ] 2021-01-05 09:23:15.137 [[main]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, "pipeline.sources"=>["/etc/logstash/conf.d/pop-gr-split.conf"], :thread=>"#<Thread:0x72642c00 run>"}
[INFO ] 2021-01-05 09:23:17.435 [[main]-pipeline-manager] javapipeline - Pipeline Java execution initialization time {"seconds"=>2.29}
[INFO ] 2021-01-05 09:23:18.030 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
[INFO ] 2021-01-05 09:23:18.131 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[INFO ] 2021-01-05 09:23:18.200 [[main]<file] observingtail - START, creating Discoverer, Watch with file and sincedb collections
[INFO ] 2021-01-05 09:23:18.771 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[WARN ] 2021-01-05 09:23:19.533 [[main]<file] plain - Received an event that has a different character encoding than you configured. {:text=>"1;Griechenland;12;S\\xFCd-Aegean Meer;344027;343127;337149;330509\\r", :expected_charset=>"UTF-8"}
{
          "year" => 2019,
      "RegionID" => "0",
        "LandID" => "1",
    "RegionName" => "Total",
          "2020" => "10696536",
       "message" => "1;Griechenland;0;Total;9006022;10696536;10510196;10303199\r",
      "@version" => "1",
          "2030" => "10303199",
          "path" => "/home/estest/csv-data/Einwohner_test.csv",
          "2019" => "9006022",
          "2025" => "10510196",
          "host" => "es1",
    "@timestamp" => 2021-01-05T09:23:19.498Z,
      "LandName" => "Griechenland",
     "residents" => 9006022
}
{
          "year" => 2020,
      "RegionID" => "0",
        "LandID" => "1",
    "RegionName" => "Total",
          "2020" => "10696536",
       "message" => "1;Griechenland;0;Total;9006022;10696536;10510196;10303199\r",
      "@version" => "1",
          "2030" => "10303199",
          "path" => "/home/estest/csv-data/Einwohner_test.csv",
          "2019" => "9006022",
          "2025" => "10510196",
          "host" => "es1",
    "@timestamp" => 2021-01-05T09:23:19.498Z,
      "LandName" => "Griechenland",
     "residents" => 10696536
}

Best regards

Tim

Regarding my last anwser :slight_smile:

so modifying the remove field expression like this did not work

remove field => [ "a" , "20_" ]

but modifying by specifying each year has worked liked this:

 remove_field => [ "a", "2019", "2020", "2025", "2030" ]

and gave this output:

 "LandID" => "1",
          "host" => "es1",
      "@version" => "1",
          "year" => 2019,
       "message" => "1;Griechenland;0;Total;9006022;10696536;10510196;10303199\r",
    "@timestamp" => 2021-01-05T10:42:18.192Z,
          "path" => "/home/estest/csv-data/Einwohner_test.csv",
     "residents" => 9006022,
      "RegionID" => "0",
      "LandName" => "Griechenland",
    "RegionName" => "Total"

so I assume simple conditionals like tried above are not possible?

If you need to drop fields that match a pattern you could use a prune filter

prune { blacklist_names => [ "^20" ] }

Badger, you are the man!
Thank you very much again. You helped me out, or rather solve my problem which led a great advance in my project!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.