How to add a for loop and if condition in logstash input plugin


(Bharath Pusuluri) #1

My requirement is sa follows:

I am running the following python script in input field. The Python script contains a curl command and outputting that data as input (JSON format).
input {
exec {
type => "apps"
command => "python /usr/share/logstash/pythontest.py"
interval => "60"
codec => "json"
}
}

Now my concern is the curl command running by active server and sometimes it converts as standby. So I would like to right a If condition in input plugin. If the result of the command executed contains message saying "This is a standby RM", it need to run another script immediately and this need to be put in a for loop or while loop to run for atleast 20 times. Will it be possible to read the data and write a logic in Input plugin?


(Christian Dahlqvist) #2

It is not possible to have conditional logic within a plugin. Why do you not modify your Python script to handle this instead?


(Bharath Pusuluri) #3

#! /usr/bin/python
import subprocess
proc = subprocess.Popen(["curl", "hostname1:8088/ws/v1/cluster/apps?state=RUNNING"], stdout=subprocess.PIPE)
(out, err) = proc.communicate()
print out

The above is my current python code which is executing as part of the logstash input plugin as defined above. Now, sometimes the above curl command returns a message saying hostname1 is a standby server and during that time I should move on to other server hostname2

Exact Message - This is standby RM. The redirect url is: /ws/v1/cluster/apps?state=RUNNING

I need to filter this message passing to elasticsearch output, but the later part of execution to be processed as it is.

import subprocess
proc = subprocess.Popen(["curl", "hostname2:8088/ws/v1/cluster/apps?state=RUNNING"], stdout=subprocess.PIPE)
(out, err) = proc.communicate()
print out

In order to satisfy this condition, I have tried the following. Now the problem is the input plugin reading the first output and also the second output. So thought I can put some filter in input plugin, I think I can try for adding filter. What is your idea in this scenario of curl commands with active and standby.

#! /usr/bin/python
import subprocess
proc = subprocess.Popen(["curl", "hostname1:8088/ws/v1/cluster/apps?state=RUNNING"], stdout=subprocess.PIPE)
(out, err) = proc.communicate()
print out
if "standby RM" in out:
proc = subprocess.Popen(["curl", "hostname2:8088/ws/v1/cluster/apps?state=RUNNING"], stdout=subprocess.PIPE)
(out, err) = proc.communicate()
print out
print "test succesful"


(Bharath Pusuluri) #4

my input and filter looks like this..Another problem is I am not able to filter the remove_field => [ "amContainerLogs" ,"trackingUrl"] loading to elasticsearch.

input {
exec {
type => "apps"
command => "python /usr/share/logstash/pythontest.py"
interval => "60"
codec => "json"
}
}

filter {
if [type] == "apps"
{
json {
source => "message"
remove_field => [ "amContainerLogs" ,"trackingUrl"]
}
split { field => "[apps]" }
split { field => "[apps][app]"
remove_field => [ "command" ,"tags" ,"apps.app.amContainerLogs" ,"apps.app.trackingUrl"]
}
split { field => "[apps][app][resourceRequests]" }
}
}


(Bharath Pusuluri) #5

Can you help Christian, how to filter the text coming before the JSON output and also how to filter any JSON data columns.

Actually I tried the using JSON filter remove_field but still not working as expected


(Christian Dahlqvist) #6

It would be easier if you could show what the data looks like and what the goal is.


(Bharath Pusuluri) #7

JSON format data looks like below

<apps>

<app>

<id>application_1540397431401_63966</id>

<user>prdodsapp</user>

<name>

distcp: oozie:action:T=java:W=falcon-dr-fs-workflow:A=dr-replication:ID=0019347-180619181447685-oozie-oozi-W

</name>

<queue>default</queue>

<state>RUNNING</state>

<finalStatus>UNDEFINED</finalStatus>

<progress>81.26881</progress>

<trackingUI>ApplicationMaster</trackingUI>

<trackingUrl>

http://hostname:8088/proxy/application_1540397431401_63966/

</trackingUrl>

<diagnostics/>

<clusterId>1540397431401</clusterId>

<applicationType>MAPREDUCE</applicationType>

<applicationTags>oozie-82feffab11fb930af53ee1d8ca7db26c</applicationTags>

<startedTime>1541005780354</startedTime>

<finishedTime>0</finishedTime>

<elapsedTime>1751383</elapsedTime>

<amContainerLogs>

http://hostname:8042/node/containerlogs/container_e81_1540397431401_63966_01_000001/prdodsapp

</amContainerLogs>

<amHostHttpAddress>hostname:8042</amHostHttpAddress>

<allocatedMB>12960</allocatedMB>

<allocatedVCores>4</allocatedVCores>

<runningContainers>4</runningContainers>

<memorySeconds>44958791</memorySeconds>

<vcoreSeconds>13870</vcoreSeconds>

<queueUsagePercentage>0.4974943</queueUsagePercentage>

<clusterUsagePercentage>0.26367188</clusterUsagePercentage>

<preemptedResourceMB>0</preemptedResourceMB>

<preemptedResourceVCores>0</preemptedResourceVCores>

<numNonAMContainerPreempted>0</numNonAMContainerPreempted>

<numAMContainerPreempted>0</numAMContainerPreempted>

<resourceRequests>

<capability>

<memory>3240</memory>

<virtualCores>1</virtualCores>

</capability>

<nodeLabelExpression/>

<numContainers>0</numContainers>

<priority>

<priority>0</priority>

</priority>

<relaxLocality>true</relaxLocality>

<resourceName>*</resourceName>

</resourceRequests>

<resourceRequests>

<capability>

<memory>3240</memory>

<virtualCores>1</virtualCores>

</capability>

<nodeLabelExpression/>

<numContainers>0</numContainers>

<priority>

<priority>20</priority>

</priority>

<relaxLocality>true</relaxLocality>

<resourceName>*</resourceName>

</resourceRequests>

<logAggregationStatus>NOT_START</logAggregationStatus>

</app>


(Christian Dahlqvist) #8

That looks a lot like XML and not JSON.


(Bharath Pusuluri) #9

when we run the curl in linux system, it looks like this

{"apps":{"app":[{"id":"application_1540397431401_63966","user":"prdodsapp","name":"distcp: oozie:action:T=java:W=falcon-dr-fs-workflow:A=dr-replication:ID=0019347-180619181447685-oozie-oozi-W","queue":"default","state":"RUNNING","finalStatus":"UNDEFINED","progress":84.73618,"trackingUI":"ApplicationMaster","trackingUrl":"http://hostname.com:8088/proxy/application_1540397431401_63966/","diagnostics":"","clusterId":1540397431401,"applicationType":"MAPREDUCE","applicationTags":"oozie-82feffab11fb930af53ee1d8ca7db26c","startedTime":1541005780354,"finishedTime":0,"elapsedTime":2249376,"amContainerLogs":"http://hostname:8042/node/containerlogs/container_e81_1540397431401_63966_01_000001/prdodsapp","amHostHttpAddress":"hostname:8042","allocatedMB":12960,"allocatedVCores":4,"runningContainers":4,"memorySeconds":51412664,"vcoreSeconds":15862,"queueUsagePercentage":0.4974943,"clusterUsagePercentage":0.26367188,"preemptedResourceMB":0,"preemptedResourceVCores":0,"numNonAMContainerPreempted":0,"numAMContainerPreempted":0,"resourceRequests":[{"capability":{"memory":3240,"virtualCores":1},"nodeLabelExpression":"","numContainers":0,"priority":{"priority":0},"relaxLocality":true,"resourceName":""},{"capability":{"memory":3240,"virtualCores":1},"nodeLabelExpression":"","numContainers":0,"priority":{"priority":20},"relaxLocality":true,"resourceName":""}],"logAggregationStatus":"NOT_START"}


(Bharath Pusuluri) #10

Updated another set


(system) #11

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.