How to test the result of a query and take an action based on that result?

Is there a way to test the result of a query and take an action based on that result?

I need to do the following:
For new coming documents, make a search on a index X to see if that document already exists and if it has 5 fields in common with;
if exists, update/replace the document on index X; if doesn't exists, add the document to the same index X.

At the end we have documents that has all these 5 fields different from any other document. The idea behind is to not repeat documents that has these 5 fields equal to each other.

How can I test if the result of query was successful and based on that result take an action (if-then-else), as I show below (skull from logstash pipeline):

I have the following pipeline:

filter {


	elasticsearch {
        hosts => ["hostnames"] 
        index => "talend_es_remote_nrep-%{+YYYY.MM.dd}"
        user => "write-indices-talend"        
        password => "*******"

        query_template => "talend_query_template.json"
		fields => {
            "rem_task_execution_id" => "rem_task_execution_id"
            "rem_connector_label" => "rem_connector_label"
            "rem_task_name" => "rem_task_name"
            "rem_connection_name" => "rem_connection_name"
            "name" => "name"

 -----> HOW TO DO THIS ?
	if "RESULTS FROM THE query template return 1 result" {
		flag = "to_update" 
	} else 	{
		flag = "to_add"
output {
	#stdout { codec => rubydebug { metadata => true } }
	if [flag] == "to_update" {
		elasticsearch {
			hosts => ["hostnames"]
			index => "talend_es_remote_nrep-%{+YYYY.MM.dd}"
			user => "write-indices-talend"
			password => "**"
			document_id => "%{rem_task_execution_id}"
			action => "update"
			doc_as_upsert => "true"
	} else if [flag] == "to_add" {
		elasticsearch {
			hosts => ["hostnames"] 
			index => "talend_es_remote_nrep-%{+YYYY.MM.dd}"
			user => "write-indices-talend"        
			password => "**"


My template "talend_query_template.json" has the content:

	  "size": 1,
	  "sort" : [ { "@timestamp" : "desc" } ],
	  "query": {
		"bool": {
		  "filter": [
			  "term": {
				"name": "%{name}"
			  "term": {
				"rem_task_execution_id": "%{rem_task_execution_id}"
			  "term": {
				"rem_task_name": "%{rem_task_name}"
			  "term": {
				"rem_connector_label": "%{rem_connector_label}"
			  "term": {
				"rem_connection_name": "%{rem_connection_name}"

Thank you
Best regards

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.