Hello there !
I am fetching data from vRops api using http_poller.
Is there any way to achieve pagination in logstash ?
Hello Anik,
I am not aware of anything like this. If you really need paging you only have a few options:
- Create a custom http_poller plugin (too much work I guess)
- Use the
exec
plugin to call an script to read and paginate the data (may be a good option if you are fit in scripts) - Create a recursive Logstash pipeline (is also a good option because the complete logic is stored and administrated in Elastic)
We used option 3 to get data from a cloud provider which limits the amount of data that can be read within a single request. This is our general setup:
- We use the
heartbeat
input to trigger our pipeline regularly - We have a second input to allow the pipeline triggering itself (can be a file or kafka or ...)
- We switched from
http_poller
input tohttp
filter plugin - If the response contains a
nextPageToken
entry, the pipeline triggers itself with this token to get the next page
Best regards
Wolfram
Hey @Wolfram_Haussig
Thank you so much for your response !
I am having hard time understanding the option 3.
Could you please attach some code samples or references for better understanding !
Hi,
No problem. Here is the main part(I removed some code which is not relevant for you). I have another pipeline with the heartbeat input as I do a login and other required steps before pagination but this should be easy enough to figure out:
input {
# get initial trigger from Sensor-Data-Test-Init
# or get the nextPageToken from itself
file {
path => "/opt/elastic/logstash/file_input/sensors-*.json"
codec => "json"
}
}
filter {
# get the first/next page depending if nextPageToken is set
if([nextPageToken]) {
mutate {
rename => ["[nextPageToken]", "[@metadata][currentPageToken]" ]
}
http {
url => "https://api.mytarget.com/v2/devices?page_size=100&page_token=%{[@metadata][currentPageToken]}"
headers => {
"authorization" => "Bearer %{[access_token]}"
}
target_body => "[body]"
target_headers => "[headers]"
}
}else {
http {
url => "https://api.mytarget.com/v2/devices?page_size=100"
headers => {
"authorization" => "Bearer %{[access_token]}"
}
target_body => "[body]"
target_headers => "[headers]"
}
}
#create seperate event for the nextPageToken to be sent to this pipeline again
# remove the old token from the other event
if ([@metadata][currentPageToken] != [body][nextPageToken] and "" != [body][nextPageToken]) {
ruby {
code => "
require 'pry'
generated = LogStash::Event.new
generated.set('nextPageToken', event.get('[body][nextPageToken]'))
generated.set('access_token', event.get('[access_token]'))
generated.set('project_id', event.get('[project_id]'))
new_event_block.call(generated)
"
}
}
# split response JSON into one event for each device
split {
field => "[body][devices]"
target => "[devices]"
remove_field => [ "body", "headers" ]
}
output {
# send events containing sensors to elasticsearch
if(![nextPageToken]) {
elasticsearch{
id => "write-device-data"
hosts => ["elastic-01.mycompany.com:9200","elastic-02.mycompany.com:9200","elastic-03.mycompany.com:9200"]
ssl => true
user => "logstash_admin"
password => "${output.elasticsearch.password}"
cacert => "/etc/ssl/certs/cacert.pem"
ssl_certificate_verification => true
document_id => "%{[devices][name]}"
index => "sensordata-devices"
}
}
# send nextPageToken to this pipeline to process the next page
if([nextPageToken]) {
file {
path => "/opt/elastic/logstash/file_input/sensors-next-%{+yyyy-MM-dd}.json"
}
}
}
- The input reads data which can be a simple trigger from heartbeat or a trigger from itself with a
nextPageToken
.I use a file input here but you can also use kafka or similar. Only the beats input plugin is not possible as the pipeline cannot start the output when the input plugin was not started. But this is required as the pipeline has to trigger itself. - depending on the availability of the
nextPageToken
the pipeline will either request the first page (without token) or the next page(with token) using thehttp
filter - If the response has a
nextPageToken
we create a new event using theruby
filter to be sent to the pipeline coontaining the token to trigger loading the next page - we split the response into separate events using the
split
filter
5a. if the document does not contain thenextPageToken
, this is a payload document and we send it to Elasticsearch
5b. if the document contains anextPageToken
, this is the trigger for the next page so we write it to a file to trigger the pipeline again.
Maybe this is not the most beautiful way but it works for us.
Best regards
Wolfram
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.