How to add a for loop and if condition in logstash input plugin

My requirement is sa follows:

I am running the following python script in input field. The Python script contains a curl command and outputting that data as input (JSON format).
input {
exec {
type => "apps"
command => "python /usr/share/logstash/pythontest.py"
interval => "60"
codec => "json"
}
}

Now my concern is the curl command running by active server and sometimes it converts as standby. So I would like to right a If condition in input plugin. If the result of the command executed contains message saying "This is a standby RM", it need to run another script immediately and this need to be put in a for loop or while loop to run for atleast 20 times. Will it be possible to read the data and write a logic in Input plugin?

It is not possible to have conditional logic within a plugin. Why do you not modify your Python script to handle this instead?

#! /usr/bin/python
import subprocess
proc = subprocess.Popen(["curl", "hostname1:8088/ws/v1/cluster/apps?state=RUNNING"], stdout=subprocess.PIPE)
(out, err) = proc.communicate()
print out

The above is my current python code which is executing as part of the logstash input plugin as defined above. Now, sometimes the above curl command returns a message saying hostname1 is a standby server and during that time I should move on to other server hostname2

Exact Message - This is standby RM. The redirect url is: /ws/v1/cluster/apps?state=RUNNING

I need to filter this message passing to elasticsearch output, but the later part of execution to be processed as it is.

import subprocess
proc = subprocess.Popen(["curl", "hostname2:8088/ws/v1/cluster/apps?state=RUNNING"], stdout=subprocess.PIPE)
(out, err) = proc.communicate()
print out

In order to satisfy this condition, I have tried the following. Now the problem is the input plugin reading the first output and also the second output. So thought I can put some filter in input plugin, I think I can try for adding filter. What is your idea in this scenario of curl commands with active and standby.

#! /usr/bin/python
import subprocess
proc = subprocess.Popen(["curl", "hostname1:8088/ws/v1/cluster/apps?state=RUNNING"], stdout=subprocess.PIPE)
(out, err) = proc.communicate()
print out
if "standby RM" in out:
proc = subprocess.Popen(["curl", "hostname2:8088/ws/v1/cluster/apps?state=RUNNING"], stdout=subprocess.PIPE)
(out, err) = proc.communicate()
print out
print "test succesful"

my input and filter looks like this..Another problem is I am not able to filter the remove_field => [ "amContainerLogs" ,"trackingUrl"] loading to elasticsearch.

input {
exec {
type => "apps"
command => "python /usr/share/logstash/pythontest.py"
interval => "60"
codec => "json"
}
}

filter {
if [type] == "apps"
{
json {
source => "message"
remove_field => [ "amContainerLogs" ,"trackingUrl"]
}
split { field => "[apps]" }
split { field => "[apps][app]"
remove_field => [ "command" ,"tags" ,"apps.app.amContainerLogs" ,"apps.app.trackingUrl"]
}
split { field => "[apps][app][resourceRequests]" }
}
}

Can you help Christian, how to filter the text coming before the JSON output and also how to filter any JSON data columns.

Actually I tried the using JSON filter remove_field but still not working as expected

It would be easier if you could show what the data looks like and what the goal is.

JSON format data looks like below

<apps>

<app>

<id>application_1540397431401_63966</id>

<user>prdodsapp</user>

<name>

distcp: oozie:action:T=java:W=falcon-dr-fs-workflow:A=dr-replication:ID=0019347-180619181447685-oozie-oozi-W

</name>

<queue>default</queue>

<state>RUNNING</state>

<finalStatus>UNDEFINED</finalStatus>

<progress>81.26881</progress>

<trackingUI>ApplicationMaster</trackingUI>

<trackingUrl>

http://hostname:8088/proxy/application_1540397431401_63966/

</trackingUrl>

<diagnostics/>

<clusterId>1540397431401</clusterId>

<applicationType>MAPREDUCE</applicationType>

<applicationTags>oozie-82feffab11fb930af53ee1d8ca7db26c</applicationTags>

<startedTime>1541005780354</startedTime>

<finishedTime>0</finishedTime>

<elapsedTime>1751383</elapsedTime>

<amContainerLogs>

http://hostname:8042/node/containerlogs/container_e81_1540397431401_63966_01_000001/prdodsapp

</amContainerLogs>

<amHostHttpAddress>hostname:8042</amHostHttpAddress>

<allocatedMB>12960</allocatedMB>

<allocatedVCores>4</allocatedVCores>

<runningContainers>4</runningContainers>

<memorySeconds>44958791</memorySeconds>

<vcoreSeconds>13870</vcoreSeconds>

<queueUsagePercentage>0.4974943</queueUsagePercentage>

<clusterUsagePercentage>0.26367188</clusterUsagePercentage>

<preemptedResourceMB>0</preemptedResourceMB>

<preemptedResourceVCores>0</preemptedResourceVCores>

<numNonAMContainerPreempted>0</numNonAMContainerPreempted>

<numAMContainerPreempted>0</numAMContainerPreempted>

<resourceRequests>

<capability>

<memory>3240</memory>

<virtualCores>1</virtualCores>

</capability>

<nodeLabelExpression/>

<numContainers>0</numContainers>

<priority>

<priority>0</priority>

</priority>

<relaxLocality>true</relaxLocality>

<resourceName>*</resourceName>

</resourceRequests>

<resourceRequests>

<capability>

<memory>3240</memory>

<virtualCores>1</virtualCores>

</capability>

<nodeLabelExpression/>

<numContainers>0</numContainers>

<priority>

<priority>20</priority>

</priority>

<relaxLocality>true</relaxLocality>

<resourceName>*</resourceName>

</resourceRequests>

<logAggregationStatus>NOT_START</logAggregationStatus>

</app>

That looks a lot like XML and not JSON.

when we run the curl in linux system, it looks like this

{"apps":{"app":[{"id":"application_1540397431401_63966","user":"prdodsapp","name":"distcp: oozie:action:T=java:W=falcon-dr-fs-workflow:A=dr-replication:ID=0019347-180619181447685-oozie-oozi-W","queue":"default","state":"RUNNING","finalStatus":"UNDEFINED","progress":84.73618,"trackingUI":"ApplicationMaster","trackingUrl":"http://hostname.com:8088/proxy/application_1540397431401_63966/","diagnostics":"","clusterId":1540397431401,"applicationType":"MAPREDUCE","applicationTags":"oozie-82feffab11fb930af53ee1d8ca7db26c","startedTime":1541005780354,"finishedTime":0,"elapsedTime":2249376,"amContainerLogs":"http://hostname:8042/node/containerlogs/container_e81_1540397431401_63966_01_000001/prdodsapp","amHostHttpAddress":"hostname:8042","allocatedMB":12960,"allocatedVCores":4,"runningContainers":4,"memorySeconds":51412664,"vcoreSeconds":15862,"queueUsagePercentage":0.4974943,"clusterUsagePercentage":0.26367188,"preemptedResourceMB":0,"preemptedResourceVCores":0,"numNonAMContainerPreempted":0,"numAMContainerPreempted":0,"resourceRequests":[{"capability":{"memory":3240,"virtualCores":1},"nodeLabelExpression":"","numContainers":0,"priority":{"priority":0},"relaxLocality":true,"resourceName":""},{"capability":{"memory":3240,"virtualCores":1},"nodeLabelExpression":"","numContainers":0,"priority":{"priority":20},"relaxLocality":true,"resourceName":""}],"logAggregationStatus":"NOT_START"}

Updated another set

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.