Pipeline1
kafka {
bootstrap_servers => "172.20.188.11:9092,172.20.188.31:9092,172.20.188.30:9092"
topics => ["DS_Test"]
decorate_events => true
group_id => "ds_test"
codec => json
max_poll_records => "100"
consumer_threads => 2
session_timeout_ms => "30000"
auto_offset_reset => "earliest"
client_id => "logstash-ingestion"
}
}
filter
{
mutate { remove_field => "@Version" }
mutate { remove_field => "@timestamp" }
mutate { add_field => { "[@metadata][index]" => "%{[eventHeader][bpcName]}_%{[eventHeader][applicationId]}"} }
mutate { add_field => { "[@metadata][ObjectCode]" => "%{[eventHeader][applicationId]}_%{[eventHeader][objectCode]}"} }
mutate { add_field => { "[@metadata][action]" => "%{[event][action]}"} }
mutate { add_field => { "[@metadata][applicationId]" => "%{[eventHeader][applicationId]}"} }
mutate { add_field => { "[@metadata][itemId]" => "%{[eventData][itemId]}"} }
mutate { add_field => { "[@metadata][isDeleted]" => "%{[eventData][isDeleted]}"} }
mutate { add_field => { "[@metadata][manufacturer]" => "%{[eventData][manufacturerInfo][0][manufacturer][value]}"} }
mutate { add_field => { "[@metadata][manufacturerPartNumber]" => "%{[eventData][manufacturerInfo][0][manufacturerPartNumber]}"} }
mutate { add_field => { "[@metadata][category]" => "%{[eventData][categoryInfo][0][category][value]}"} }
mutate { add_field => { "[@metadata][noun]" => "%{[eventData][attributes][noun][value]}"} }
mutate { add_field => { "[@metadata][modifier]" => "%{[eventData][attributes][modifier][value]}"} }
mutate { gsub => ["[@metadata][action]","INSERT","index"] }
mutate { gsub => ["[@metadata][action]","Insert","index"] }
mutate { gsub => ["[@metadata][action]","UPDATE","update"] }
mutate { gsub => ["[@metadata][action]","Update","update"] }
mutate { gsub => ["[@metadata][action]","DELETE","delete"] }
mutate { gsub => ["[@metadata][action]","Delete","delete"] }
mutate { remove_field => "[logstash]" }
mutate { remove_field => "[metadata]" }
mutate { remove_field => "[eventHeader]" }
mutate { remove_field => "[event]" }
mutate {
copy => {
"[@metadata][manufacturer]" => "param_manufacturer"
"[@metadata][manufacturerPartNumber]" => "param_manufacturerPartNumber"
"[@metadata][category]" => "param_category"
"[@metadata][noun]" => "param_noun"
"[@metadata][modifier]" => "param_modifier"
}
}
json{
source => "message"
}
ruby {
code => '
details_hash = event.get("eventData")
return if details_hash.nil?
details_hash.each do |k, v|
event.set(k,v)
end
event.remove("eventData")
hash = event.to_hash
hash.each {|k,v|
data = event.get(k)
return if data.nil?
event.set(k,v)
}'
}
clone{clones => ["dataSource1"]}
#for itemId
if [type] == "dataSource1" and [@metadata][applicationId] != "62"{
ruby {
code => 'event.set("field","itemId")
event.set("text",event.get("itemId"))
event.set("contactcode",0)
event.set("nounModifier","")
event.set("appId",62)
event.set("type","AS")
event.set("normalizedText","")
event.get("isDeleted") == true ? event.set("softDelete", "Y") : event.set("softDelete","N")'
}
prune {
whitelist_names => [ "itemId","IndexedOn","field","text","softDelete","normalizedText","type","appId","nounModifier","contactcode"]
}
mutate { add_field => { "[@metadata][type]" => "dataSource1" } }
}
}
output {
if [@metadata][type] == "dataSource1" and [@metadata][applicationId] == "36" {
elasticsearch {
hosts => "http://172.20.188.15:9200"
user => "elastic"
password => "changeme"
index => "flowersfoods_62"
document_id => "%{[@metadata][ObjectCode]}_1"
action => "%{[@metadata][action]}"
}
}
stdout { codec => rubydebug { metadata => true} }
pipeline input { stdin {} } output { pipeline { send_to => [ds_test1] } }
}
========================================================
pipeline2
input {
pipeline { address => "ds_test1"}
http_poller {
urls => {
soap_request => {
method => post
url => "https://dm-dev-authenticationservice-serviceapps-use-rg.azurewebsites.net/api/v1/Token"
headers => {
"Content-Type" => "application/json"
}
body =>
'{"itemId":event.get("itemId")}'
}
}
request_timeout => 60
schedule => { cron => "* * * * * UTC"}
codec => "json"
metadata_target => "http_poller_metadata"
}
}
filter
{
mutate { add_field => { "[@metadata][itemId]" => "%{[event][itemId]}"} }
mutate { add_field => { "status" => "%{[http_poller_metadata][code]}"} }
clone{clones => ["dataSource"]}
if [type] == "dataSource" {
prune {
whitelist_names => ["status"]
}
mutate { add_field => { "[@metadata][type]" => "dataSource" } }
}
json{ source=>"message" }
}
output{
if [@metadata][type] == "dataSource"
{
elasticsearch {
hosts => "http://172.20.188.15:9200"
user => "elastic"
password => "changeme"
index => "flowersfoods_62"
document_id => "%{[@metadata][ObjectCode]}_8"
action => "index"
}
}
stdout { codec => rubydebug { metadata => true} }
}
i want event,@metadata of pipeline 1 inside pipeline 2 and want to use them inside http_poller like body =>
'{"itemId":event.get("itemId")}' in pipeline 2