Paginantion in logstash

Hello there !
I am fetching data from vRops api using http_poller.
Is there any way to achieve pagination in logstash ?

Hello Anik,

I am not aware of anything like this. If you really need paging you only have a few options:

  1. Create a custom http_poller plugin (too much work I guess)
  2. Use the exec plugin to call an script to read and paginate the data (may be a good option if you are fit in scripts)
  3. Create a recursive Logstash pipeline (is also a good option because the complete logic is stored and administrated in Elastic)

We used option 3 to get data from a cloud provider which limits the amount of data that can be read within a single request. This is our general setup:

  • We use the heartbeat input to trigger our pipeline regularly
  • We have a second input to allow the pipeline triggering itself (can be a file or kafka or ...)
  • We switched from http_poller input to http filter plugin
  • If the response contains a nextPageToken entry, the pipeline triggers itself with this token to get the next page

Best regards
Wolfram

Hey @Wolfram_Haussig
Thank you so much for your response !
I am having hard time understanding the option 3.
Could you please attach some code samples or references for better understanding !

Hi,

No problem. Here is the main part(I removed some code which is not relevant for you). I have another pipeline with the heartbeat input as I do a login and other required steps before pagination but this should be easy enough to figure out:

input {
# get initial trigger from Sensor-Data-Test-Init 
# or get the nextPageToken from itself

    file {
        path => "/opt/elastic/logstash/file_input/sensors-*.json"
        codec => "json"
    }
}
filter {
# get the first/next page depending if nextPageToken is set
if([nextPageToken]) {
    mutate {
        rename => ["[nextPageToken]", "[@metadata][currentPageToken]" ]
    }
    http {
        url => "https://api.mytarget.com/v2/devices?page_size=100&page_token=%{[@metadata][currentPageToken]}"
        headers => {
            "authorization" => "Bearer %{[access_token]}"
        }
        target_body => "[body]"
        target_headers => "[headers]"
    }
}else {
    http {
        url => "https://api.mytarget.com/v2/devices?page_size=100"
        headers => {
            "authorization" => "Bearer %{[access_token]}"
        }
        target_body => "[body]"
        target_headers => "[headers]"
    }
}
#create seperate event for the nextPageToken to be sent to this pipeline again
# remove the old token from the other event
if ([@metadata][currentPageToken] != [body][nextPageToken] and "" != [body][nextPageToken]) {
    ruby {
            code => "
                    require 'pry'
                    generated = LogStash::Event.new
                    generated.set('nextPageToken', event.get('[body][nextPageToken]'))
                    generated.set('access_token', event.get('[access_token]'))
                    generated.set('project_id', event.get('[project_id]'))
                    new_event_block.call(generated)
                    "
    }
}
# split response JSON into one event for each device
split {
   field => "[body][devices]"
    target => "[devices]"
    remove_field => [ "body", "headers" ]
 }
output {
# send events containing sensors to elasticsearch
if(![nextPageToken]) {
	elasticsearch{
        id => "write-device-data"
        hosts => ["elastic-01.mycompany.com:9200","elastic-02.mycompany.com:9200","elastic-03.mycompany.com:9200"]
		ssl => true
		user => "logstash_admin"
		password => "${output.elasticsearch.password}"
        cacert => "/etc/ssl/certs/cacert.pem"
        ssl_certificate_verification => true
        document_id => "%{[devices][name]}"
		index => "sensordata-devices"
	}
}
# send nextPageToken to this pipeline to process the next page
if([nextPageToken]) {
    file {
        path => "/opt/elastic/logstash/file_input/sensors-next-%{+yyyy-MM-dd}.json"
    }
}
}
  1. The input reads data which can be a simple trigger from heartbeat or a trigger from itself with a nextPageToken.I use a file input here but you can also use kafka or similar. Only the beats input plugin is not possible as the pipeline cannot start the output when the input plugin was not started. But this is required as the pipeline has to trigger itself.
  2. depending on the availability of the nextPageToken the pipeline will either request the first page (without token) or the next page(with token) using the http filter
  3. If the response has a nextPageToken we create a new event using the ruby filter to be sent to the pipeline coontaining the token to trigger loading the next page
  4. we split the response into separate events using the split filter
    5a. if the document does not contain the nextPageToken, this is a payload document and we send it to Elasticsearch
    5b. if the document contains a nextPageToken, this is the trigger for the next page so we write it to a file to trigger the pipeline again.

Maybe this is not the most beautiful way but it works for us.

Best regards
Wolfram

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.