Usign ruby to update local Linux environment variable

I am ingesting some data with Logstash, with Kafka as an input.
Is it possible with ruby, to configure a filter, which will update a Linux environment variable with what I am getting from Kafka?
Lets say I will get something like below with Kafka:
version_number = 1
I want ruby to put this as an environment variable on my system, so I can then use environment filter plugin in Logstash, to fetch this value when I need it?

This isn't straight-forward to do, and an environment variable is most likely not what you want to use. If you explain in greater detail what you're trying to do it'll be easier to make a suggestion.

My use case is that I have a single pipeline, where:

  1. first input gets data from Kafka, a field called version_number. Output is like:
    version_number=1
    This version number will be sent only once. If it changes, it will be sent once with new version number such as:
    version_number=2
    2)second input is listening for ActiveMQ data and this data is sent continuously.

What I want to achieve is for the ActiveMQ bus to contain the version_number from Kafka, so I can write it to ElasticSearch.
From my understanding:

  • Logstash does not have plugin for Kafka poller, which would return continuously the version_number
  • I cannot use Logstash aggregate filter as this would only get the version_number once in the output, not continuously

I want to keep a configuration where I have one pipeline, if possible.
I was thinking I could do:
-Get version_number from Kafka and save it as a environment variable (with ruby maybe)?
-Pass this version_number to my ActiveMQ output with the environment filter plugin. I tested the environment filter plugin and it seems to work well, the only trick is to pass this Kafka output to Logstash correctly, so I can use it. I only tested with an environment variable I set myself, manually.

Any suggestions?

You can have a kafka input that listens for version_number messages and writes those to a file that the translate filter can read. The ActiveMQ part of your pipeline could then have a translate filter that reads the file.

I answered you before with the same suggestion as Magnus - here

I specified two pipelines to keep the two processing paths clean and separate. LS 6.x can use multiple pipelines in the same LS instance.

Get two pipelines working first, then two can converge them into one by using if conditional blocks to separate the specific flows.

Thank you both for your answer. My concern is that, the the version_number I actually have on the Kafka topic, already comes from an xml file. So I would retrieve my needed value from xml file, put it on Kafka topic and then again place it another file. Is this the neatest way to achieve what I need?

So my full workflow would look like this:

  1. I have an input configuration where I have an xml file, from which I get the version_number and put it on Kafka topic.
    The way my version_number will update is by a user placing a file within a specific directory and Logstash will check the directory and extract the version_number from the file, which will then be placed on bus.
  2. I get the version_number and put it in another file, lets say version_number.csv
  3. I enhance my ActiveMQ data with version_number, which comes from version_number.csv , which comes from my primary xml file.

Would it be possible to:

  1. Get version_number from xml and place it on Kafka topic.
  2. Get the version_number from Kafka topic and put it into redis as a key value.
  3. Read the key value from redis and enhance my ActiveMQ output with it?
    Or is this not going to work either?

OK, so you did not tell us about the LS that reads the XML file and puts it in Kafka.

This means you can eliminate Kafka entirely.

Have the 1st LS instance read the XML file and write the CSV file directly. Have the 2nd LS (ActiveMq) use the translate filter to periodically read the CSV file.

1 Like

Thanks so much for your suggestion, I think this will simplify my workflow.
I will follow your advice!

So I have the following working-

  1. I can read my xml file and output the version_number as follows:
    output {
    csv {
    path => "/tmp/version_number.csv"
    create_if_deleted => true
    fields => ["version_number", "@timestamp"]
    }
    My csv file contains:
    issue 01 01,2018-01-04T15:05:27.429Z

2)Now I am trying to get this into my ActiveMQ output.
I have used the translate plugin in my filter as follows:
translate {
field => "version_number"
refresh_interval => "1"
destination => "version_number"
add_field => {"version_number" => "version_number"}
dictionary_path => '/tmp/version_number.csv'
}

No field appears though! What am I doing wrong?
Basically I would just like to add the version_number field.

I think you should use the exec filter as I explained in your original post because the LHS of the pair needs to be a field value that is found in the event in the field that the translate filter is set to use.

But what is the value in the field version_number in the event before you do the translate?

In my previous reply I said you should have a "constant" field value. Meaning you should add a field in the activemq input e.g. version_number -> 'to_be_sourced' then the CSV file that you overwrite (with the exec output) has the contents to_be_sourced,issue 01 01. The translate filter will translate the value to_be_sourced into whatever the CSV file has as the RHS of the pair.

So initially the CSV file is to_be_sourced,issue 01 01 then later it becomes to_be_sourced,issue 01 02 and even later it is to_be_sourced,issue 02 11 - the LHS remains the same.

You can still use your CSV output solution but then you also need to add a field say constant_out with a value of to_be_sourced then the csv output has fields => ["constant_out", "version_number"]

This will yield the required CSV file where the RHS of the pair changes but the LHS remains the same.

Ok, I think I am getting there!
What I have in my single config file is:

  1. Two inputs, one for file (xml) and one for jms(active mq).
    In my xml input, I watch the particular file that I need version_number from. I have not added anything else new.
    In my jms input, I have added:
    add_field => {
    "version_number" => "to_be_sourced"
    }
  2. In filter, I have two conditions, if type= activemq, I process activemq data. There I have added:
    translate {
    field => "version_number"
    refresh_interval => "1"
    destination => "current-version"
    dictionary_path => '/tmp/version_number.csv'
    }
    In my file filter, I have added a new field as follows:
    mutate {
    add_field => {
    "constant_out" => "to_be_sourced"
    }

3)my output is for two types again, file + activemq
In fily type output, I have following configuration
csv {
path => "/tmp/version_number.csv"
create_if_deleted => true
fields => ["constant_out", "version_number"]
}
in activemq output, I write the output to ElasticSearch as needed.

This almost works! I have the following field added in my ActiveMQ output:
"current-version" => "issue 123456789"
and this gets written to Elastic as required!

I tested it a bit and see:

  1. If I append the csv file manually where I store version_number, Logstash pics up the version number and appends my ActiveMQ output- great!!
  2. If I copy a new xml file, which has a new version number, in the place of the old xml file, my new file doesnt get picked up it seems. The version in the csv file stays the same.
  3. If I stop and start my pipeline, Logstash picks up new version from my replaced xml file.

I see you mentioned exec plugin I havent used. Maybe this is my problem.
How do I ensure that Logstash processes changed xml file(with exact same name, location but different version number inside file) so my csv file gets updated?

NOTE: I have noticed that when I append the xml file manually, sometimes my change gets picked up and my new version_number value gets put in the csv file.Same when I move the file from filexyz to originalfile. It works some of the time, but not always.

Looking at this line the actual refresh is not done until two things happen - 1) an event comes along, 2) enough time has elapsed.

In my original reply to you I advised that this latency could be a problem - but at least it works in principle.
I also advised that you may need to switch the translate filter out for the jdbc_streaming filter and use a SQLite local db. In this case you would use the exec output and the SQLite CLI to update a single column, single record table. As the jdbc_streaming filter does a "live" call to the DB it will pick up the new value immediately provided you turn off caching.

Also, because you are using a single LS instance pipeline you will have to "prime" the db with the first version number because you can't start the XML read pipeline a few minutes before the ActiveMQ one.

Just out of interest.... If I store my key value of version_number in redis, can I use some filter to read this value and enrich my ActiveMQ data with this version coming from redis?

:smile: I had thought about using redis, unfortunately we don't have an enhancement plugin that uses redis. I even considered a Redis JDBC driver, but that is a very untested route which may lead to more problems than its worth.

We could build a Redis lookup plugin but it would not happen soon. Seems like a good fit to me. How do you see it being used? Redis key sourced from event where the Redis value is a simple single value or a JSON object or a CSV string?

That would sound great if in the future there could be an enhancement plugin for Redis!
In my case a single value would be enough I believe. But a JSON object could be a nice option as well.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.