Running Pipeline Manually

Quick Question for all:
When I am trying to run my pipeline for logstash do I execute pipelines.yml, or just start up logstash?

Based on this Documentation I believe I should be starting up logstash

" This file is formatted in YAML and contains a list of dictionaries, where each dictionary describes a pipeline, and each key/value pair specifies a setting for that pipeline. The example shows two different pipelines described by their IDs and configuration paths. For the first pipeline, the value of pipeline.workers is set to 3, while in the other, the persistent queue feature is enabled. The value of a setting that is not explicitly set in the pipelines.yml file will fall back to the default specified in the logstash.yml settings file.

When you start Logstash without arguments, it will read the pipelines.yml file and instantiate all pipelines specified in the file. On the other hand, when you use -e or -f, Logstash ignores the pipelines.yml file and logs a warning about it."

When I do I get this error:

[2024-02-07T15:39:09,759][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:vpc, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"input\", \"filter\", \"output\" at line 19, column 3 (byte 477) after ", :backtrace=>["D:/Logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:239:in `initialize'", "org/logstash/execution/AbstractPipelineExt.java:173:in `initialize'", "D:/Logstash/logstash-core/lib/logstash/java_pipeline.rb:48:in `initialize'", "org/jruby/RubyClass.java:911:in `new'", "D:/Logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50:in `execute'", "D:/Logstash/logstash-core/lib/logstash/agent.rb:386:in `block in converge_state'"]}
[

As you can see line 19 has nothing in it:

# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
#
# Or as flat keys:
#
 pipeline.batch.size: 5000
 pipeline.batch.delay: 5
#
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#
# node.name: test  #LINE 19
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------

What is the exactly command you are running?

To run a pipeline manually you use the -f parameter pointing to a logstash configuration file, normally saved as a .conf file.

What you shared is the logstash.yml file, which is not a pipeline configuration file, it is the settings file for the logstash process.

So if I run logstash -f cloudtrail.conf it will kick in the rest of the pipeline, or will it just run that 1 conf ?

I have a few CONFs configured in my pipeline.yml, cloudtrail.conf is one of them.

If you run logstash -f cloudtrail.conf it will run only the cloudtrail.conf pipeline.

If you want to run multiple pipelines that are configured in pipelines.yml you need to run logstash without any parameter or as a service, which is the easiest and most common way to run it.

Right. When I do that I get this error:

[2024-02-07T18:23:05,859][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:vpc, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"input\", \"filter\", \"output\" at line 19, column 3

This means that one of your logstash configuration is wrong.

It says what is the id of the pipeline with error: Create/pipeline_id:vpc

You need to share the configuration you are using for this pipeline, also share your pipelines.yml as well.

hmm interesting.

When I run logstash -f vpc.conf it works and ingests the logs correctly.

Here is the pipelines.yml

# List of pipelines to be loaded by Logstash
#
# This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings.
# Default values for omitted settings are read from the `logstash.yml` file.
# When declaring multiple pipelines, each MUST have its own `pipeline.id`.
#
# Example of two pipelines:
#
 - pipeline.id: cloudtrail
   pipeline.workers: 3
   path.config: 'D:/logstash/cloudtrail.conf'
 - pipeline.id: vpc
   path.config: 'D:/Logstash/vpc.conf'
   queue.type: persisted
#
# Available options:
#
#   # name of the pipeline
#   pipeline.id: mylogs
#
#   # The configuration string to be used by this pipeline
#   config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
#
#   # The path from where to read the configuration text
   path.config: "/D:/Logstash/*.conf"
#
#   # How many worker threads execute the Filters+Outputs stage of the pipeline
#   pipeline.workers: 1 (actually defaults to number of CPUs)
#
#   # How many events to retrieve from inputs before sending to filters+workers
#   pipeline.batch.size: 125
#
#   # How long to wait in milliseconds while polling for the next event
#   # before dispatching an undersized batch to filters+outputs
#   pipeline.batch.delay: 50
#
#   Set the pipeline event ordering. Options are "auto" (the default), "true" # #   or "false".
#   "auto" automatically enables ordering if the 'pipeline.workers' setting
#   is also set to '1', and disables otherwise.
#   "true" enforces ordering on a pipeline and prevents logstash from starting
#   a pipeline with multiple workers allocated.
#   "false" disable any extra processing necessary for preserving ordering.
#
#   pipeline.ordered: auto
#
#   # Internal queuing model, "memory" for legacy in-memory based queuing and
#   # "persisted" for disk-based acked queueing. Defaults is memory
#   queue.type: memory
#
#   # If using queue.type: persisted, the page data files size. The queue data consists of
#   # append-only data files separated into pages. Default is 64mb
#   queue.page_capacity: 64mb
#
#   # If using queue.type: persisted, the maximum number of unread events in the queue.
#   # Default is 0 (unlimited)
#   queue.max_events: 0
#
#   # If using queue.type: persisted, the total capacity of the queue in number of bytes.
#   # Default is 1024mb or 1gb
#   queue.max_bytes: 1024mb
#
#   # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
#   # Default is 1024, 0 for unlimited
#   queue.checkpoint.acks: 1024
#
#   # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
#   # Default is 1024, 0 for unlimited
#   queue.checkpoint.writes: 1024
#
#   # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
#   # Default is 1000, 0 for no periodic checkpoint.
#   queue.checkpoint.interval: 1000
#
#   # Enable Dead Letter Queueing for this pipeline.
#   dead_letter_queue.enable: false
#
#   If using dead_letter_queue.enable: true, the maximum size of dead letter queue for this pipeline. Entries
#   will be dropped if they would increase the size of the dead letter queue beyond this setting.
#   Default is 1024mb
#   dead_letter_queue.max_bytes: 1024mb
#
#   If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ
#   have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files
#   may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and
#   being available to be read by the dead_letter_queue input when items are are written infrequently.
#   Default is 5000.
#
#   dead_letter_queue.flush_interval: 5000

#   If using dead_letter_queue.enable: true, defines the action to take when the dead_letter_queue.max_bytes is reached,
#   could be "drop_newer" or "drop_older".
#   With drop_newer, messages that were inserted most recently are dropped, logging an error line.
#   With drop_older setting, the oldest messages are dropped as new ones are inserted.
#   Default value is "drop_newer".
#
#   dead_letter_queue.storage_policy: drop_newer

#   If using dead_letter_queue.enable: true, the interval that events have to be considered valid. After the interval has
#   expired the events could be automatically deleted from the DLQ.
#   The interval could be expressed in days, hours, minutes or seconds, using as postfix notation like 5d,
#   to represent a five days interval.
#   The available units are respectively d, h, m, s for day, hours, minutes and seconds.
#   If not specified then the DLQ doesn't use any age policy for cleaning events.
#
#   dead_letter_queue.retain.age: 1d

#
#   If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
#   Default is path.data/dead_letter_queue
#
#   path.dead_letter_queue:

Here is VPC Conf

# S3 input filter - final

input {
      # stdin {}
  file {
    type => "cloudtrail"
    path => "D:/01-evidence/vpc/*.log"
	sincedb_path => "D:/logstash/data/queue/sincedb_aws-vpc_ingest.log"
	mode => "read"
	codec => "plain"
	file_completed_action => "log"
	file_completed_log_path => "D:/logstash/logs/logstash-aws-vpc_logs_read.log"	
  }
}

filter {

	geoip {
		      source => "SourceAddress"
		      target => "geoip"
		      add_tag => ["vpc-geoip"]
             ecs_compatibility => "disabled"
	      }
  csv {
    source => "[message]"
    target => "[csv]"
    separator => " "
    columns => [ "Version","Account-ID","Interface-ID","SourceAddress","Dest Address","SourcePort","Dest Port","Protocol","Packets","Bytes","Start","End","Action","Log-Status" ]
    skip_empty_columns => "true"
    skip_empty_rows => "true"
  }
}

output {
    stdout  {
      codec => rubydebug
}
  elasticsearch {
    hosts => ["http://1.1.1.1:9200"]
    index => "vpc-pipe-test"
  }
}

Try commenting that out.

New Error when I try that:

[2024-02-07T19:31:54,758][ERROR][logstash.config.sourceloader] No configuration found in the configured sources.

Made 2 changes. Now when it runs it is ignoring the cloudtrail conf and just running the VPC conf.

  1. in logstash.yml pointed the path.config to the pipelines.yml: path.config: "/D:/Logstash/config/pipelines.yml"
  2. in pipelines.yml added a / before the 2 path.config lines:
    path.config: "/D:/Logstash/vpc.conf"

Got it working. JSON in my evidence location was the issue.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.