Is it possible to select which event to be processed by each worker?

Hi,

Logstash 6.7.1

As my workflows are a little bit complicate, since

  • I need to parse together different lines (events)
  • lines are non consecutive
  • lines do not have any common label or tag

I implemented the filtering using Ruby plugin.
In the Ruby plugin, I do things like this (in ruby, but 'a la python'):

if event.get( "interesting_field1" )
    #  --- we record in memory the first field we want ---
    $field1 = event.get( "interesting_field1" )
    event.cancel

elsif  event.get( "interesting_field2" )
     #  --- we record in memory the second field we want ---
    $field2 = event.get( "interesting_field2" )
    event.cancel

elsif event.get( "final_field" )
    #  --- we now release the fields we want ---
    event.set( "field1", $field1 ) 
    event.set( "field2", $field2 ) 

else
    event.cancel
end

Unless I misread results, I had to run Logstash with workers=1. Otherwise, each worker would parse a random selection of events, and I couldn't correlate them as I need.
First question: is what I just said correct? Or is still possible to get the same results with multiple workers?
If yes, does that increase performance?

Anyways...

  • I will be parsing with Logstash data from multiple hosts (each one sending output with Filebeat).
  • So the "hostname" is also another field in each event.
  • I only need to analyze together events coming from the same host (of course, the ruby code is more complex, as I record values in Hashes, not just simple variables).

So I was wondering if there is a way to run multiple workers, forcing each worker to parse only the events from a single input host.
In other words, select the worker based on the value of field "hostname". Is that possible?

Or it would more efficient -in terms of CPU usage and memory consumption- to run one Logstash instance per core, each one listening at a different port, and configure each Filebeat to send its output to one of these ports?

Any comment is more than welcome.
Cheers,
Jose

I would suggest using an aggregate filter rather than global variables in ruby filters. You are still limited to a single worker thread per pipeline.

You could run a single logstash instance with multiple pipelines that do the same thing, except they have beats inputs listening on different ports.

Hmm, I didn't think on running multiple pipelines at once. That should work.
Will each pipeline run on a different core (assuming # cores > # pipelines) ?

Yes, each pipeline has its own worker threads, so I would expect the OS to schedule them across all the cores.

Having multiple pipelines, each one listening to a different port, does not seem to work.
My pipelines.yml is like this

- pipeline.id: pipeline_0
  path.config: /etc/logstash/conf.d/logstash_0
- pipeline.id: pipeline_1
  path.config: /etc/logstash/conf.d/logstash_1

where the configuration files for logstash_0 and logstash_1 are identical, except the first one listens to port 5000 and the second one to port 5001.
I have then 2 FileBeat hosts writing to port 5000, and 2 writing to port 5001.

The logstash pipelines output plugins, for testing purposes, is file. Each one writing to a separate file.
I see in the output file for each pipeline data that is coming from a host that is suppose to write to the another pipeline/port. For example, in output from logstash_1, I have data coming from hosts writing to port 5000.
How is that possible?

If you have logstash configured the way you think you have it would not be happening. Therefore your configuration does not look the way you think it does. I would run with '--config.debug --log.level debug' and triple-check that the pipeline confgurations look the way you want them to.

The log file seems to be correct. Started server on port 5000 for pipeline_0, and on port 5001 for pipeline_1.
This is quite a mystery.

BTW, I also have a /var/log/logstash.yml like this:

path.data: /var/lib/logstash
path.logs: /var/log/logstash
log.level: debug
pipeline.workers: 1

I am assuming that it applies to each one of the entries in pipelines.yml. Is that correct?

Yes, that sets a default which can be overridden in pipelines.yml

Something you might want to try is to add a tag (mutate+add_tag) that identifies which pipeline an event went through and verify that tags has the expected value.

Is it possible that the pipelines.yml file is not picked up and the two configuration files therefore are concatenated? Can you move them to a different directory and update pipelines.yml to see if that has any effect?

Hmm. I can try adding a new field in the FileBeat config, and search for that value in the LogStash filter.
I am using FileBeat 6.6.7
Would it be like this ?

- type: log
  paths:
    - /var/log/app/mylogs*
  fields: {otherfield: othervalue, port: 5010}

So then, in LogStash, I can filter like this?

filter {
   if [fields][port] == "5010" {
       # do the actual filtering
   }
}

No, add a tag (not a field) in the logstash configuration. That way, if Christian is correct (and his is a very viable theory), you will end up with both tags.

Or, as I said, you could run with '--config.debug --log.level debug' and see what these lines look like

[2019-06-12T18:47:35,464][DEBUG][logstash.config.pipelineconfig] -------- Logstash Config ---------
[2019-06-12T18:47:35,468][DEBUG][logstash.config.pipelineconfig] Config from source {:source=>LogStash::Config::Source::MultiLocal, :pipeline_id=>:main}
[2019-06-12T18:47:35,471][DEBUG][logstash.config.pipelineconfig] Config string {:protocol=>"file", :id=>"/home/user/simple.conf"}
[2019-06-12T18:47:35,476][DEBUG][logstash.config.pipelineconfig]

input { [...]

You should have two sets of those. One for each pipeline.

thinking deeper about this, there is another piece of information that may be really relevant.
As part of the filter { } setup, I am using the ruby plugin.
Maybe the problem is there, that the Ruby code is not thread safe? Could it be?
My ruby code, as I am new to ruby, has all variables starting with $. Maybe they should be @@?

It is definitely possible for ruby filters not to be thread safe. In most ruby filters that I write I use neither $ nor @@, nor even @

In ruby, the prefix on a variable determines its scope:

  • $ is a global variable. Don't use this.
  • @@ is a class variable and is shared across all instances of the class. Don't use this.
  • @ is instance variable, held only by the instance (in this case, the instance of the generated ruby class for each plugin invocation). You'll need to implement your own threadsafety measures if you plan to share this across workers, but with pipeline.workers: 1 this should behave as you expect.
  • no-prefix is a local variable; use this for temporary variables that are not needed when processing other events.

I was using as I need to keep those variables between events, as I process several events together in a way that cannot be done by the aggregate filter. So I thought, naively, using was the simplest way to keep those variables in memory.
I will try with @ then. I will post here the results.

Yes, replacing global variables by instance variables works.

When I was writing the ruby code, I didn't know how exactly LogStash uses it. Still don't. But, from this experience, I am guessing LogStash takes the code and uses it to implement a class from which it instantiates objects. Is that correct?
As I didn't know that, I never even consider using class variables or instances variables, since I had no idea the code in the ruby plugin ends up being objects. So I thought only local and global were my only two choices.

Maybe making it more explicit in the documentation could be helpful?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.