=>"Expected one of [ \\t\\r\\n], \"#\", \"{\", \"}\"

I am getting the following error and is resulting in a ingestion issue and cant seem to figure this out:

[2022-08-29T14:23:39,615][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:rtt_kfk2es-metaswitch_vsbc_cdr, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "{", "}" at line 56, column 7 (byte 2018) after filter {\n xml {\n source => "message"\n target => "cdr"\n }\n\n ruby {\n code => '\n cdr = event.get("cdr")\n cdr["call"].each_with_index do |call, index|\n\t\t# separate call.party into call.party.orig and call.party.dest\n event.remove("[cdr][call][#{index}][party]")\n call["party"].each do |party|\n if party["type"] == "orig"\n event.set("[cdr][call][#{index}][party][orig]", party)\n end\n if party["type"] == "term"\n event.set("[cdr][call][#{index}][party][term]", party)\n end\n end\n\t\t# Separate adjacency\n event.remove("[cdr][call][#{index}][adjacency]")\n call["adjacency"].each do |adjacency|\n if adjacency["type"] == "orig"\n event.set("[cdr][call][#{index}][adjacency][orig]", adjacency)\n end\n if adjacency["type"] == "term"\n event.set("[cdr][call][#{index}][adjacency][term]", adjacency)\n end\n if adjacency["type"] == "rejected"\n event.set("[cdr][call][#{index}][adjacency][rejected]", adjacency)\n end\n end\n\t\t# QoS[i].gate[j].flowinfo[k].RTCPstats\n\t\tif event.get("[cdr][call][#{index}][QoS]")\n\t\t\tcall["QoS"].each_with_index do |qos, qos_i|\n\t\t\t\tqos["gate"].each_with_index do |gate, gate_i|\n\t\t\t\t\tevent.remove("[cdr][call][#{index}][QoS][#{qos_i}][gate][#{gate_i}][flowinfo]")\n\t\t\t\t\tgate["flowinfo"].each_with_index do |flowinfo, flowinfo_i|\n\t\t\t\t\t'", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in block in converge_state'"]}

Here is my conf. file:

input {
  kafka {
	bootstrap_servers => "wc9097"
    topics => ["uc_metaswitch_sbc_cdr"]
    group_id => "elk_ingestion"
    consumer_threads => "2"
    auto_offset_reset => "latest"
    security_protocol => "SASL_SSL"
    sasl_kerberos_service_name => "kafka"
    ssl_truststore_location => "/opt/vsoConfig/pk.truststore.jks"
    ssl_truststore_password => "1234567"
    jaas_path => "/opt/vsoConfig/jaas_fid_auto.conf"
    codec => plain
  }
}

filter {
  xml {
    source => "message"
    target => "cdr"
  }

  ruby {
    code => '
      cdr = event.get("cdr")
      cdr["call"].each_with_index do |call, index|
		# separate call.party into call.party.orig and call.party.dest
        event.remove("[cdr][call][#{index}][party]")
        call["party"].each do |party|
          if party["type"] == "orig"
            event.set("[cdr][call][#{index}][party][orig]", party)
          end
          if party["type"] == "term"
            event.set("[cdr][call][#{index}][party][term]", party)
          end
        end
		# Separate adjacency
        event.remove("[cdr][call][#{index}][adjacency]")
        call["adjacency"].each do |adjacency|
          if adjacency["type"] == "orig"
            event.set("[cdr][call][#{index}][adjacency][orig]", adjacency)
          end
          if adjacency["type"] == "term"
            event.set("[cdr][call][#{index}][adjacency][term]", adjacency)
          end
          if adjacency["type"] == "rejected"
            event.set("[cdr][call][#{index}][adjacency][rejected]", adjacency)
          end
        end
		# QoS[i].gate[j].flowinfo[k].RTCPstats
		if event.get("[cdr][call][#{index}][QoS]")
			call["QoS"].each_with_index do |qos, qos_i|
				qos["gate"].each_with_index do |gate, gate_i|
					event.remove("[cdr][call][#{index}][QoS][#{qos_i}][gate][#{gate_i}][flowinfo]")
					gate["flowinfo"].each_with_index do |flowinfo, flowinfo_i|
						stats_txt = flowinfo["RTCPstats"][0]
						flowinfo.delete("RTCPstats")
						stats_array = stats_txt.split(", ")
						stats = {}
						stats_array.each do |stat|
							kv = stat.split("=")
							stats[kv[0]] = kv[1]
						end
						flowinfo["RTCPstats"] = stats
						event.set("[cdr][call][#{index}][QoS][#{qos_i}][gate][#{gate_i}][flowinfo][#{flowinfo_i}]", flowinfo)
					end
				end
			end
		end
      end
    '
  }

}

output {
  elasticsearch{
    hosts => ["https://pkl.ca:873/meta/"]
    index => "uc-metaswitch-vsbc-cdr"
    user => "${LOGSTASH_AD_USERNAME}"
    password => "${LOGSTASH_AD_PWD}"
    cacert => "/opt/vsoConfig/pk.cer"
 }
}

This means there is a syntax error in your pipeline, specifically somewhere in your filter part.
And probably in the ruby filter.

1 Like

Restarting the pipeline somehow just made it work, it seems there wasnt any error in it

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.