Pipeline to pipeline communication - logstash

Pipeline1

 kafka {
    bootstrap_servers => "172.20.188.11:9092,172.20.188.31:9092,172.20.188.30:9092"
    topics => ["DS_Test"]
    decorate_events => true
    group_id => "ds_test"
    codec => json
    max_poll_records => "100"
    consumer_threads => 2
    session_timeout_ms => "30000"
    auto_offset_reset => "earliest"
    client_id => "logstash-ingestion"
 }
}
filter
{
  mutate { remove_field => "@Version" }
  mutate { remove_field => "@timestamp" }
  mutate { add_field => { "[@metadata][index]" => "%{[eventHeader][bpcName]}_%{[eventHeader][applicationId]}"} }
  mutate { add_field => { "[@metadata][ObjectCode]" => "%{[eventHeader][applicationId]}_%{[eventHeader][objectCode]}"} }
  mutate { add_field => { "[@metadata][action]" => "%{[event][action]}"} }
  mutate { add_field => { "[@metadata][applicationId]" => "%{[eventHeader][applicationId]}"} }
  mutate { add_field => { "[@metadata][itemId]" => "%{[eventData][itemId]}"} }
  mutate { add_field => { "[@metadata][isDeleted]" => "%{[eventData][isDeleted]}"} }
  mutate { add_field => { "[@metadata][manufacturer]" => "%{[eventData][manufacturerInfo][0][manufacturer][value]}"} }
  mutate { add_field => { "[@metadata][manufacturerPartNumber]" => "%{[eventData][manufacturerInfo][0][manufacturerPartNumber]}"} }
  mutate { add_field => { "[@metadata][category]" => "%{[eventData][categoryInfo][0][category][value]}"} }
  mutate { add_field => { "[@metadata][noun]" => "%{[eventData][attributes][noun][value]}"} }
  mutate { add_field => { "[@metadata][modifier]" => "%{[eventData][attributes][modifier][value]}"} }
  mutate { gsub => ["[@metadata][action]","INSERT","index"] }
  mutate { gsub => ["[@metadata][action]","Insert","index"] }
  mutate { gsub => ["[@metadata][action]","UPDATE","update"] }
  mutate { gsub => ["[@metadata][action]","Update","update"] }
  mutate { gsub => ["[@metadata][action]","DELETE","delete"] }
  mutate { gsub => ["[@metadata][action]","Delete","delete"] }
  mutate { remove_field => "[logstash]" }
  mutate { remove_field => "[metadata]" }
  mutate { remove_field => "[eventHeader]" }
  mutate { remove_field => "[event]" }
  mutate {
                copy => {
                "[@metadata][manufacturer]" => "param_manufacturer"
                "[@metadata][manufacturerPartNumber]" => "param_manufacturerPartNumber"
                "[@metadata][category]" => "param_category"
                "[@metadata][noun]" => "param_noun"
                "[@metadata][modifier]" => "param_modifier"
                }
        }
 
   json{
            source => "message"
       }
 ruby {
        code => '
        details_hash = event.get("eventData")
        return if details_hash.nil? 
        details_hash.each do |k, v|
          event.set(k,v)
        end
		  event.remove("eventData")
		  hash = event.to_hash
		  hash.each {|k,v|
		    data = event.get(k)
			return if data.nil? 
			 event.set(k,v)
		}'
	}
clone{clones => ["dataSource1"]}

#for itemId
if [type] == "dataSource1" and [@metadata][applicationId] != "62"{
        ruby {
                        code => 'event.set("field","itemId")
                                         event.set("text",event.get("itemId"))
                                         event.set("contactcode",0)
                                         event.set("nounModifier","")
                                         event.set("appId",62)
                                         event.set("type","AS")
                                         event.set("normalizedText","")
                                         event.get("isDeleted") == true ? event.set("softDelete", "Y") : event.set("softDelete","N")'
                 }
        prune {
                whitelist_names => [ "itemId","IndexedOn","field","text","softDelete","normalizedText","type","appId","nounModifier","contactcode"]
        }	
        mutate { add_field => { "[@metadata][type]" => "dataSource1" } }
}

}

output {
        if [@metadata][type] == "dataSource1" and [@metadata][applicationId] == "36" {
                 elasticsearch {
                        hosts => "http://172.20.188.15:9200"
                        user => "elastic"
                        password => "changeme"
                        index => "flowersfoods_62"
						document_id => "%{[@metadata][ObjectCode]}_1"
                        action => "%{[@metadata][action]}"
                        }					
		
        }
stdout { codec => rubydebug { metadata => true} }
pipeline input { stdin {} } output { pipeline { send_to => [ds_test1] } }
}

========================================================

pipeline2


input { 
	pipeline { address => "ds_test1"} 
	http_poller {
			urls => {
			  soap_request => {
				method => post
				url => "https://dm-dev-authenticationservice-serviceapps-use-rg.azurewebsites.net/api/v1/Token"
				headers => {
				  "Content-Type" => "application/json"
				}
				body => 
				'{"itemId":event.get("itemId")}' 		
			  }
			}
			request_timeout => 60
			schedule => { cron => "* * * * * UTC"}
			codec => "json"    
			metadata_target => "http_poller_metadata"
		}
}

filter
{
	 mutate { add_field => { "[@metadata][itemId]" => "%{[event][itemId]}"} }    
	 mutate { add_field => { "status" => "%{[http_poller_metadata][code]}"} }
	 clone{clones => ["dataSource"]}
	 if [type] == "dataSource" {		 
		prune {
				whitelist_names => ["status"]
		}
		mutate { add_field => { "[@metadata][type]" => "dataSource" } }
	}	
json{ source=>"message" }
}
	
output{
if [@metadata][type] == "dataSource"
	{
		elasticsearch {
				hosts => "http://172.20.188.15:9200"
				user => "elastic"
				password => "changeme"
				index => "flowersfoods_62" 
				document_id => "%{[@metadata][ObjectCode]}_8"								
				action => "index"
		}
	}
stdout { codec => rubydebug { metadata => true} }
}

i want event,@metadata of pipeline 1 inside pipeline 2 and want to use them inside http_poller like body => 
				'{"itemId":event.get("itemId")}' 	in pipeline 2

Did you have a question?

Yes I want to access events and @metadata of pipeline 1 in pipeline 2 how to read that values?

My understanding is that the PipelineBus persists [@metadata] across pipeline connections, so it should just work. If you have a reproducible case where it does not work you might want to update this issue.

ruby
{
mutate { add_field => { "[@metadata][action]" => "%{[event][action]}"} }
mutate { add_field => { "[@metadata][applicationId]" => "%{[eventHeader][applicationId]}"} }
}

i want to read this @metadata values in pipeline 2 how I can read this?

The same ways you would reference them in pipeline1. sprintf references, conditionals, etc.

Hi,

inside filter of pipeline2

input {
pipeline { address => "ds_test1"}
}
filter {
mutate { add_field => { "test" => "%{[@metadata][ObjectCode]}"} }
}
output{.....}
am assigning value to "test" but in debug response below value am getting

"test" => "%{[@metadata][ObjectCode]}",

===================================pipeline 1=============
input{
kafka {.....}
}

filter
{
mutate { add_field => { "[@metadata][ObjectCode]" => "testvalue"} }
}
output
{
stdout { codec => rubydebug { metadata => true} }
pipeline input { stdin {} } output { pipeline { send_to => [ds_test1] } }
}

please let me know if where am wrong

Maybe you are looking at events generated by the http_poller input in pipeline2. All I can say is that if I have a pipeline configured with

input { generator { count => 1 lines => [ '' ] } }

filter {
    mutate { add_field => { "[eventHeader][applicationId]" => "A" "[eventHeader][objectCode]" => 1 } }
    mutate { add_field => { "[@metadata][ObjectCode]" => "%{[eventHeader][applicationId]}_%{[eventHeader][objectCode]}"} }

    prune { whitelist_names => [ "itemId","IndexedOn","field","text","softDelete","normalizedText","type","appId","nounModifier","contactcode"] }
}

output { pipeline { send_to => [ds_test1] } }

and another configured with

input {
    pipeline { address => "ds_test1"}
}

filter {
    mutate { add_field => { "test" => "%{[@metadata][ObjectCode]}"} }
}

output{
    stdout { codec => rubydebug { metadata => true} }
}

I get

{
"@timestamp" => 2020-06-04T19:35:25.754Z,
      "test" => "A_1",
 "@metadata" => {
    "ObjectCode" => "A_1"
},
  "@version" => "1"
}
1 Like

Hi Am not able to see
{
"@timestamp" => 2020-06-04T19:35:25.754Z,
"test" => "A_1",
"@metadata" => {
"ObjectCode" => "A_1"
},
"@version" => "1"
}
this output using debug do I need to add any thing

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.