More input based on first input(Multiple pipelines?)

I am currently looking to build a solution, where I can load input based on some other input. So my setup is that I have two inputs:

  • The data I need
  • The timestamp of the latest update of this data

I only want to get the data (which is multiple API-calls) if the timestamp is newer than last time I pulled the data. I already have a Logstash-script that can compare the dates.

What I am unsure of is how I get the data when the timestamp has been updated.
As the whole point is to not send too many requests to the data server, I can't load it in the same Logstash script during the input phase.
I have been looking into multiple pipelines, where I only send data to a virtual address, if the timestamp has been updated.
The other pipeline needs to make the API-calls, but it still runs even if there is no data in the virtual address. And as conditionals can't be used in the input phase, I can't see how to make it not run, when no data is sent to the virtual address.

Any ideas? Is this even possible?

Hi

If I understand your issue correctly, you'll have two pipelines, A and B, and A sends something to B. A compares the dates and B is supposed to fetch your data only if the dates match some condition of yours.

I'm thinking you could configure pipeline A so as to send an event to B only when the dates match, and add a tag in that event. Something like this:

input {
    <your code to get the timestamp of the latest update into a field "my_utdate_time">
}
filter {
  
  if [my_utdate_time] >= @timestamp {
    mutate {
      add_tag => [
        "go_get_it"
      ]
    }
  }
}
output {
  if "go_get_it" in [tags] {
    pipeline { send_to => gogetit }
  }
  
}

Then you configure pipeline B in such a way that it will only execute the input if your go_get_it tag is in the event. Something like this:

input {
  pipeline { address => gogetit }
  
  if "go_get_it" in [tags] {
    <fetch the data>
  }
  
}

I don't know whether you can use an if statement in the input{} section. Probably not.

Maybe you can solve this by having two flows in pipeline A. You will have to separate the flows using tags in your config, but that's easy to do. Let's call them flow A1 and flow A2.

Flow A1 saves a flag on a text file iff your times match.

Flow A2 listents for changes on that file, and sends a signal to pipeline B iff there's a change.

A bit convoluted, I admit it. Maybe someone else will come up with an easier, more elegant solution.

Hope this helps.

My first idea was also to do a conditional like you suggest with the [tags].
But I ran in to an error and some search on the forum let me to conclude that if statements won't work in input{}.
Curiously though, when I set a tag and tell my pipeline B to only do something, if it's there, it simply ignores the if-statement and just gets the data anyway. But only the first time it runs. Runs afterwards, I get my input from the virtual address printed to stdout but I can see it does not fetch anything. I am using http_poller to get the data.

I'm not quite sure what your second suggestions covers - the input is API calls and output is ES, so I don't actually have any text files in the setup.

Heres my code for pipeline A:

input {
  http_poller {
    urls => {
      MyTimestamp => {
        method => get
        user => "[user]"
        password => "[pass]"
        url => "[url]"
        headers => {
          Accept => "application/json"
        }
      }
    }
    schedule => { cron => "* * * * * Europe/Copenhagen"}
    codec => plain
    metadata_target => "http_poller_metadata"
	tags => ["go_get_it"]
  }
}

filter {
  if [http_poller_metadata][response_message]=="OK"{

	xml {
	    source => "message"
        target => "json_message" }

    mutate {
		add_field => {"newest_timestamp" => "%{[json_message][version][-1][date]}"}
		add_field => {"response" => "%{[http_poller_metadata][response_message]}"}
		add_field => {"graph_id" => 10461}
		remove_field => "message"
	}
	date {
	    match => ["newest_timestamp", "ISO8601"]
		target => "time"}

    elasticsearch {
		hosts => ["host"]
		user => "[user]"
		password => "[pass]"
		index => "[index_name]"
		query => "graph_id:%{[graph_id]}"
		enable_sort => false
		fields => {"time" => "es_time"}
		}
	date {
	    match => ["es_time", "ISO8601"]
		target => "es_timestamp"}
	prune {
            whitelist_names => ["time", "es_timestamp", "graph_id", "response", "tags"]  
		}
	}	
}
output {
  if [response]=="OK"{
  
	if [time] > [es_timestamp] or ![es_timestamp]{
	    elasticsearch{
           hosts => ["host"]
           index => "[index]"
		   document_id => "%{[graph_id]}"
        }
		pipeline {send_to => getrequests}
	}
    stdout {
    codec => rubydebug    
    }
}
}

And the second pipeline looks like this:
input {
pipeline
{ address => getrequests}

  if "go_get_it" in [tags] {
      # Here there would be multiple calls, for simplicity right now it's only 1
	  http_poller {
		urls => {
		  MyTimestamp => {
			method => get
			user => "[user]"
			password => "[pass]"
			url => "[url]"
			headers => {
			  Accept => "application/json"
			}
		  }
		}
		codec => plain
		schedule => { "in" => "0" }
		metadata_target => "http_poller_metadata"
	  }
	 }
}
filter {}

output {
    stdout {
    codec => rubydebug    
    }
}

And then everything is run from the pipelines.yml

- pipeline.id: my_timestamp
  path.config: "/Elastic/logstash-7.0.0/config/my_timestamp.conf"
# Updater
- pipeline.id: updater
  path.config: "/Elastic/logstash-7.0.0/config/updater.conf"

Sorry, it won't work. Rereading my own answer I see it won't work. Conditionals cannot be used in input{} and thus it doesn't matter how many flows or pipelines we add.

A possible solution would be to use a single pipeline with two flows, again A1 and A2, separated using tags. The first flow, A1, would get and compare the dates, and its output{} would contain an exec{} plugin where you curl, or wget, or whatever, your data into a file.

The second flow, A2 will simply listen to that file (or files) and parse it to get your data into elasticsearch.

The "problem" when reading a file is that it will be read once every time you restart your pipeline, regardless of whether there have been changes to it or not.

The exec{} output looks like it could do the job, but I am stuck on how to get the output anywhere else than on my stdout.

I have now configured my output to:

output {
  if [response]=="OK"{
	if [time] > [es_timestamp] or ![es_timestamp]{
		exec{
		    command => "curl -X GET -u USER:PASS 'https://URL'"
		}
     file {
        path => "/Elastic/logstash-7.0.0/config/test.txt"
	}
 }
}
}

But I can't get my data out into the file, and the documentation of the exec filter hasn't helped me much. The data from the curl command is showing in my cmd, so that part works. I just don't know how to get it to another place.

Hi

Check the documentation for curl (e.g. https://curl.haxx.se/docs/manpage.html#-o). Option -o might be what you need to add to your curl command.

Hope this helps

Ah yes, of course! I was stuck in thinking it should be exec doing the output! Thank you. I'll dive into it and post my solution, when I have one.

I ended up with a solution, where I use the http filter plugin which had completely escaped my radar.

So now I still get my input as before and make the comparison, but instead of trying to get this info to another logstash instance, the conditional is simply made in the filter section:

if [time] > [es_timestamp] or ![es_timestamp]{    	
	http {
		url => "https://..."
		verb => GET
		user => "USER"
		password => "PASS"
		}
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.