Data relation using logstash conf file

Multiple csv files in data folder and one column in common to relate the data between files.
created logstash conf file and running manually and logstash gets shutdown.

Sample data:
File1:sample_orders.csv

id,product,quantity
2,Apple,5
1,Banana,10
3,Orange,3

File2: sample_users.csv

1,John Doe,johndoe@example.com
2,Jane Smith,janesmith@example.com
3,Robert Johnson,robertjohnson@example.com

Logstash conf file: nw_combined.conf

input {
  file {
    path => [
      "/data/sample_orders.csv",
      "/data/sample_users.csv"
    ]
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => plain {
      charset => "UTF-8"
    }
  }
}

filter {
  if [source] =~ /sample_orders\.csv/ {
    csv {
      separator => ","
      skip_header => true
      columns => ["id", "product", "quantity"]
    }
    mutate {
      convert => {
        "id" => "integer"
        "quantity" => "integer"
      }
    }
    ruby {
      code => "
        event.set('map', {}) unless event.get('map')
        event.get('map')['file1'] ||= {}
        event.get('map')['file1'][event.get('id')] ||= {}
        event.get('map')['file1'][event.get('id')]['product'] = event.get('product')
        event.get('map')['file1'][event.get('id')]['quantity'] = event.get('quantity')
        event.cancel()
      "
    }
  }

  if [source] =~ /sample_users\.csv/ {
    csv {
      separator => ","
      skip_header => true
      columns => ["id", "name", "email"]
    }
    ruby {
      code => "
        event.set('map', {}) unless event.get('map')
        event.get('map')['file2'] ||= {}
        event.get('map')['file2'][event.get('id')] ||= {}
        event.get('map')['file2'][event.get('id')]['name'] = event.get('name')
        event.get('map')['file2'][event.get('id')]['email'] = event.get('email')
        event.cancel()
      "
    }
  }

  ruby {
    code => "
      event.get('map')&.each do |file, data|
        data.each do |id, fields|
          event = LogStash::Event.new(fields)
          event.set('id', id)
          event.set('source', file)
          event.remove('@timestamp')
          event.remove('@version')
          event.tag('aggregated')
          yield event
        end
      end
    "
  }
}

output {
  if 'aggregated' in [tags] {
    stdout {
      codec => rubydebug
    }
  }
}

error:

OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[INFO ] 2023-07-19 13:48:19.807 [main] runner - Starting Logstash {"logstash.version"=>"7.17.11", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.19+7 on 11.0.19+7 +indy +jit [linux-x86_64]"}
[INFO ] 2023-07-19 13:48:19.870 [main] runner - JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djdk.io.File.enableADS=true, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true]
[WARN ] 2023-07-19 13:48:21.249 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2023-07-19 13:48:28.546 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[ERROR] 2023-07-19 13:48:31.752 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"=>\" at line 21, column 8 (byte 399) after filter {\n  csv {\n    separator => \",\"\n    columns => [\"id\", \"product\", \"quantity\"]\n    skip_header => true\n    if ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:189:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:48:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:392:in `block in converge_state'"]}
[INFO ] 2023-07-19 13:48:32.058 [LogStash::Runner] runner - Logstash shut down.

Checked for any configuration issue but there is no issue.

Thanks
Vinod

That is not the configuration you are running. The error message shows that you have a conditional inside a csv filter, which is unsupported.

:message=>"Expected one of [ \t\r\n], "#", "=>" at line 21, column 8 (byte 399) after

filter {
    csv {
        separator => ","
        columns => ["id", "product", "quantity"]
        skip_header => true
        if "

Also, none of your ruby code will do anything useful, it is setting local variables, then cancelling the events, at which point the local variables go out of scope. You should use an aggregate filter, which persists an instance variable passed to your code blocks as "map".

Hi @Badger

I have used aggregate function and getting following error.

[root@arb_test_vm1 logstash]# bin/logstash -f /etc/logstash/conf.d/combined.conf
Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[INFO ] 2023-07-24 02:11:29.888 [main] runner - Starting Logstash {"logstash.version"=>"7.17.11", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.19+7 on 11.0.19+7 +indy +jit [linux-x86_64]"}
[INFO ] 2023-07-24 02:11:29.899 [main] runner - JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djdk.io.File.enableADS=true, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true]
[WARN ] 2023-07-24 02:11:30.792 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2023-07-24 02:11:34.086 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[INFO ] 2023-07-24 02:11:39.209 [Converge PipelineAction::Create<main>] Reflections - Reflections took 254 ms to scan 1 urls, producing 119 keys and 419 values
[WARN ] 2023-07-24 02:11:41.851 [Converge PipelineAction::Create<main>] plain - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[WARN ] 2023-07-24 02:11:44.350 [Converge PipelineAction::Create<main>] plain - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[WARN ] 2023-07-24 02:11:44.560 [Converge PipelineAction::Create<main>] file - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[WARN ] 2023-07-24 02:11:44.783 [Converge PipelineAction::Create<main>] csv - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[WARN ] 2023-07-24 02:11:45.576 [Converge PipelineAction::Create<main>] csv - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[ERROR] 2023-07-24 02:11:47.004 [[main]-pipeline-manager] javapipeline - Pipeline error {:pipeline_id=>"main", :exception=>#<LogStash::ConfigurationError: Aggregate plugin: For task_id pattern '%{id}', there are more than one filter which defines timeout options. All timeout options have to be defined in only one aggregate filter per task_id pattern. Timeout options are : timeout, inactivity_timeout, timeout_code, push_map_as_event_on_timeout, push_previous_map_as_event, timeout_timestamp_field, timeout_task_id_field, timeout_tags>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.10.0/lib/logstash/filters/aggregate.rb:103:in `block in register'", "org/jruby/ext/thread/Mutex.java:164:in `synchronize'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-aggregate-2.10.0/lib/logstash/filters/aggregate.rb:97:in `register'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:75:in `register'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:233:in `block in register_plugins'", "org/jruby/RubyArray.java:1821:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:232:in `register_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:599:in `maybe_setup_out_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:245:in `start_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:190:in `run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:142:in `block in start'"], "pipeline.sources"=>["/etc/logstash/conf.d/combined.conf"], :thread=>"#<Thread:0x7d837166 run>"}
[INFO ] 2023-07-24 02:11:47.008 [[main]-pipeline-manager] javapipeline - Pipeline terminated {"pipeline.id"=>"main"}
[ERROR] 2023-07-24 02:11:47.040 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}
[INFO ] 2023-07-24 02:11:47.370 [LogStash::Runner] runner - Logstash shut down.

Logstash aggregate code:

input {
  file {
    path => [
      "/data/sample_orders.csv",
      "/data/sample_users.csv"
    ]
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => plain {
      charset => "UTF-8"
    }
  }
}

filter {
  if [source] =~ /sample_orders\.csv/ {
    csv {
      separator => ","
      skip_header => true
      columns => ["id", "product", "quantity"]
    }
    mutate {
      convert => {
        "id" => "integer"
        "quantity" => "integer"
      }
    }
    aggregate {
      task_id => "%{id}"
      code => "
        map['id'] ||= event.get('id')
        map['product'] ||= event.get('product')
        map['quantity'] ||= event.get('quantity')
        event.cancel()
      "
      push_map_as_event_on_timeout => true
      timeout_task_id_field => "id"
      timeout => 5
    }
  }

  if [source] =~ /sample_users\.csv/ {
    csv {
      separator => ","
      skip_header => true
      columns => ["id", "name", "email"]
    }
    mutate {
      convert => {
        "id" => "integer"
      }
    }
    aggregate {
       
  task_id => "%{id}"
      code => "
        map['name'] ||= event.get('name')
        map['email'] ||= event.get('email')
        event.cancel()
      "
      push_previous_map_as_event => true
      timeout_task_id_field => "id"
      timeout => 5
    }
  }
  ruby {
    code => "
      event.set('id', event.get('id')) if event.get('id')
      event.set('product', event.get('product')) if event.get('product')
      event.set('quantity', event.get('quantity')) if event.get('quantity')
      event.set('name', event.get('name')) if event.get('name')
      event.set('email', event.get('email')) if event.get('email')
      event.remove('source')
    "
  }
}

output {
 #elasticsearch {
 #hosts => ["localhost:9200"] # Update with the appropriate Elasticsearch host and port
 #index => "combined_data_nw" # Replace with the desired index name
 #document_id => "%{id}" # Use the 'id' field as the document ID in Elasticsearch
 #}
  stdout {
    codec => rubydebug
  }
}

It pretty much tells you what is wrong tho.

Hi @Badger

I have removed the timestamp declaration from the code .. it is not combined the data from 2 files. please find the out below

conf file

input {
  file {
    path => [
      "/data/sample_orders.csv",
      "/data/sample_users.csv"
    ]
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => plain {
      charset => "UTF-8"
    }
  }
}

filter {
  if [source] =~ /sample_orders\.csv/ {
    csv {
      separator => ","
      skip_header => true
      columns => ["id", "product", "quantity"]
    }
    mutate {
      convert => {
        "id" => "integer"
        "quantity" => "integer"
      }
    }
    aggregate {
      task_id => "%{id}"
      code => "
        map['id'] ||= event.get('id')
        map['product'] ||= event.get('product')
        map['quantity'] ||= event.get('quantity')
        event.cancel()
      "
    }
  }

  if [source] =~ /sample_users\.csv/ {
    csv {
      separator => ","
      skip_header => true
      columns => ["id", "name", "email"]
    }
    mutate {
      convert => {
        "id" => "integer"
      }
    }
    aggregate {
      task_id => "%{id}"
      code => "
        map['name'] ||= event.get('name')
        map['email'] ||= event.get('email')
        event.cancel()
      "
    }
  }
  ruby {
    code => "
      event.set('id', event.get('id')) if event.get('id')
      event.set('product', event.get('product')) if event.get('product')
      event.set('quantity', event.get('quantity')) if event.get('quantity')
      event.set('name', event.get('name')) if event.get('name')
      event.set('email', event.get('email')) if event.get('email')
      event.remove('source')
    "
  }
}

output {
 #elasticsearch {
 #hosts => ["localhost:9200"] # Update with the appropriate Elasticsearch host and port
 #index => "combined_data_nw" # Replace with the desired index name
 #document_id => "%{id}" # Use the 'id' field as the document ID in Elasticsearch
 #}
  stdout {
    codec => rubydebug
  }
}

output:

{
          "host" => "arb_test_vm1",
      "@version" => "1",
       "message" => "id,name,email",
    "@timestamp" => 2023-07-27T06:08:01.279Z,
          "path" => "/data/sample_users.csv"
}
{
          "host" => "arb_test_vm1",
      "@version" => "1",
       "message" => "id,product,quantity",
    "@timestamp" => 2023-07-27T06:08:01.672Z,
          "path" => "/data/sample_orders.csv"
}
{
          "host" => "arb_test_vm1",
      "@version" => "1",
       "message" => "2,Apple,5",
    "@timestamp" => 2023-07-27T06:08:01.685Z,
          "path" => "/data/sample_orders.csv"
}
{
          "host" => "arb_test_vm1",
      "@version" => "1",
       "message" => "1,Banana,10",
    "@timestamp" => 2023-07-27T06:08:01.686Z,
          "path" => "/data/sample_orders.csv"
}
{
          "host" => "arb_test_vm1",
      "@version" => "1",
       "message" => "3,Orange,3",
    "@timestamp" => 2023-07-27T06:08:01.686Z,
          "path" => "/data/sample_orders.csv"
}
{
          "host" => "arb_test_vm1",
      "@version" => "1",
       "message" => "",
    "@timestamp" => 2023-07-27T06:08:01.686Z,
          "path" => "/data/sample_orders.csv"
}
{
          "host" => "arb_test_vm1",
      "@version" => "1",
       "message" => "1,John Doe,johndoe@example.com",
    "@timestamp" => 2023-07-27T06:08:01.421Z,
          "path" => "/data/sample_users.csv"
}
{
          "host" => "arb_test_vm1",
      "@version" => "1",
       "message" => "2,Jane Smith,janesmith@example.com",
    "@timestamp" => 2023-07-27T06:08:01.424Z,
          "path" => "/data/sample_users.csv"
}
{
          "host" => "arb_test_vm1",
      "@version" => "1",
       "message" => "3,Robert Johnson,robertjohnson@example.com",
    "@timestamp" => 2023-07-27T06:08:01.424Z,
          "path" => "/data/sample_users.csv"
}
{
          "host" => "arb_test_vm1",
      "@version" => "1",
       "message" => "",
    "@timestamp" => 2023-07-27T06:08:01.425Z,
          "path" => "/data/sample_users.csv"
}

Thanks
Vinod

Try

    if [@metadata][path] =~ /sample_users.csv$/ { csv { columns => ["id", "name", "email"] } }
    if [@metadata][path] =~ /sample_orders.csv$/ { csv { skip_header => true columns => ["id", "product", "quantity"] } }
    aggregate {
        task_id => "%{id}"
        code => '
            map["product"] ||= event.get("product")
            map["quantity"] ||= event.get("quantity")
            map["name"] ||= event.get("name")
            map["email"] ||= event.get("email")
            event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "id"
        timeout => 5
    }

which will produce events like

{
"@timestamp" => 2023-07-28T00:11:20.475338206Z,
     "email" => "robertjohnson@example.com",
  "quantity" => "3",
  "@version" => "1",
   "product" => "Orange",
        "id" => "3",
      "name" => "Robert Johnson"
}

Hi @Badger

Super.. It works like a charm.. Thanks a lot

Could you please suggest if we have any samples to learn such kind of scenario's

Thanks
Vinod

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.