I am getting the following error and is resulting in a ingestion issue and cant seem to figure this out:
[2022-08-29T14:23:39,615][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:rtt_kfk2es-metaswitch_vsbc_cdr, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "{", "}" at line 56, column 7 (byte 2018) after filter {\n xml {\n source => "message"\n target => "cdr"\n }\n\n ruby {\n code => '\n cdr = event.get("cdr")\n cdr["call"].each_with_index do |call, index|\n\t\t# separate call.party into call.party.orig and call.party.dest\n event.remove("[cdr][call][#{index}][party]")\n call["party"].each do |party|\n if party["type"] == "orig"\n event.set("[cdr][call][#{index}][party][orig]", party)\n end\n if party["type"] == "term"\n event.set("[cdr][call][#{index}][party][term]", party)\n end\n end\n\t\t# Separate adjacency\n event.remove("[cdr][call][#{index}][adjacency]")\n call["adjacency"].each do |adjacency|\n if adjacency["type"] == "orig"\n event.set("[cdr][call][#{index}][adjacency][orig]", adjacency)\n end\n if adjacency["type"] == "term"\n event.set("[cdr][call][#{index}][adjacency][term]", adjacency)\n end\n if adjacency["type"] == "rejected"\n event.set("[cdr][call][#{index}][adjacency][rejected]", adjacency)\n end\n end\n\t\t# QoS[i].gate[j].flowinfo[k].RTCPstats\n\t\tif event.get("[cdr][call][#{index}][QoS]")\n\t\t\tcall["QoS"].each_with_index do |qos, qos_i|\n\t\t\t\tqos["gate"].each_with_index do |gate, gate_i|\n\t\t\t\t\tevent.remove("[cdr][call][#{index}][QoS][#{qos_i}][gate][#{gate_i}][flowinfo]")\n\t\t\t\t\tgate["flowinfo"].each_with_index do |flowinfo, flowinfo_i|\n\t\t\t\t\t'", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in
initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in
initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in
block in converge_state'"]}
Here is my conf. file:
input {
kafka {
bootstrap_servers => "wc9097"
topics => ["uc_metaswitch_sbc_cdr"]
group_id => "elk_ingestion"
consumer_threads => "2"
auto_offset_reset => "latest"
security_protocol => "SASL_SSL"
sasl_kerberos_service_name => "kafka"
ssl_truststore_location => "/opt/vsoConfig/pk.truststore.jks"
ssl_truststore_password => "1234567"
jaas_path => "/opt/vsoConfig/jaas_fid_auto.conf"
codec => plain
}
}
filter {
xml {
source => "message"
target => "cdr"
}
ruby {
code => '
cdr = event.get("cdr")
cdr["call"].each_with_index do |call, index|
# separate call.party into call.party.orig and call.party.dest
event.remove("[cdr][call][#{index}][party]")
call["party"].each do |party|
if party["type"] == "orig"
event.set("[cdr][call][#{index}][party][orig]", party)
end
if party["type"] == "term"
event.set("[cdr][call][#{index}][party][term]", party)
end
end
# Separate adjacency
event.remove("[cdr][call][#{index}][adjacency]")
call["adjacency"].each do |adjacency|
if adjacency["type"] == "orig"
event.set("[cdr][call][#{index}][adjacency][orig]", adjacency)
end
if adjacency["type"] == "term"
event.set("[cdr][call][#{index}][adjacency][term]", adjacency)
end
if adjacency["type"] == "rejected"
event.set("[cdr][call][#{index}][adjacency][rejected]", adjacency)
end
end
# QoS[i].gate[j].flowinfo[k].RTCPstats
if event.get("[cdr][call][#{index}][QoS]")
call["QoS"].each_with_index do |qos, qos_i|
qos["gate"].each_with_index do |gate, gate_i|
event.remove("[cdr][call][#{index}][QoS][#{qos_i}][gate][#{gate_i}][flowinfo]")
gate["flowinfo"].each_with_index do |flowinfo, flowinfo_i|
stats_txt = flowinfo["RTCPstats"][0]
flowinfo.delete("RTCPstats")
stats_array = stats_txt.split(", ")
stats = {}
stats_array.each do |stat|
kv = stat.split("=")
stats[kv[0]] = kv[1]
end
flowinfo["RTCPstats"] = stats
event.set("[cdr][call][#{index}][QoS][#{qos_i}][gate][#{gate_i}][flowinfo][#{flowinfo_i}]", flowinfo)
end
end
end
end
end
'
}
}
output {
elasticsearch{
hosts => ["https://pkl.ca:873/meta/"]
index => "uc-metaswitch-vsbc-cdr"
user => "${LOGSTASH_AD_USERNAME}"
password => "${LOGSTASH_AD_PWD}"
cacert => "/opt/vsoConfig/pk.cer"
}
}