I am integrating Open telemetry into my logging platform(elk stack). I have an application sending logs -> otel collector(docker) -> logstash(docker) -> elasticsearch(docker).
I am facing an issue when storing the metrics inside elastic index from logstash. Here is my current logstash.conf file:
input {
http {
port => 5045
codec => "json"
tags => ["otel"]
}
}
filter {
ruby {
code => '
require "time"
otel_trace = false
otel_metric = false
service_name = nil
extracted_metrics = []
# Extract traces
if event.get("resourceSpans")
event.get("resourceSpans").each do |resource_span|
resource_attrs = {}
if resource_span["resource"] && resource_span["resource"]["attributes"]
resource_span["resource"]["attributes"].each do |attr|
key = attr["key"]
value = attr["value"]["stringValue"] rescue attr["value"]["intValue"] rescue nil
resource_attrs[key] = value if value
end
end
service_name = resource_attrs["service.name"] if resource_attrs["service.name"]
if resource_span["scopeSpans"]
resource_span["scopeSpans"].each do |scope|
if scope["spans"]
otel_trace = true
scope["spans"].each do |span|
extracted_attributes = {}
# Extract trace details
trace_id = span["traceId"]
span_name = span["name"]
start_time_ns = span["startTimeUnixNano"].to_i
end_time_ns = span["endTimeUnixNano"].to_i
# Convert nanosecond timestamps to human-readable format
start_time_sec = start_time_ns / 1_000_000_000
end_time_sec = end_time_ns / 1_000_000_000
formatted_start_time = Time.at(start_time_sec).utc.strftime("%Y-%m-%d %H:%M:%S")
formatted_end_time = Time.at(end_time_sec).utc.strftime("%Y-%m-%d %H:%M:%S")
# Extract attributes
server_port = nil # Default to nil
if span["attributes"]
span["attributes"].each do |attr|
key = attr["key"]
value = attr["value"]["stringValue"] rescue attr["value"]["intValue"] rescue nil
extracted_attributes[key] = value if value
# Explicitly extract server port as integer
server_port = attr["value"]["intValue"] if key == "server.port"
end
end
# Map extracted fields correctly
event.set("http_url_name", span_name)
event.set("http_url_path", extracted_attributes["url.path"])
event.set("server_address", extracted_attributes["server.address"])
event.set("server_port", server_port)
event.set("user_agent_original", extracted_attributes["user_agent.original"])
event.set("start_time", formatted_start_time)
event.set("end_time", formatted_end_time)
end
end
end
end
end
end
# Extract metrics
if event.get("resourceMetrics")
event.get("resourceMetrics").each do |resource_metric|
resource_attrs = {}
if resource_metric["resource"] && resource_metric["resource"]["attributes"]
resource_metric["resource"]["attributes"].each do |attr|
key = attr["key"]
value = attr["value"]["stringValue"] rescue attr["value"]["intValue"] rescue nil
resource_attrs[key] = value if value
end
end
service_name = resource_attrs["service.name"] if resource_attrs["service.name"]
if resource_metric["scopeMetrics"]
resource_metric["scopeMetrics"].each do |scope|
if scope["metrics"]
otel_metric = true
scope["metrics"].each do |metric|
metric_name = metric["name"]
metric_description = metric["description"] rescue nil
metric_unit = metric["unit"] rescue nil
metric_value = nil
metric_timestamp_ns = nil
if metric["sum"] && metric["sum"]["dataPoints"]
metric["sum"]["dataPoints"].each do |dp|
metric_value = dp["asInt"] || dp["asDouble"]
metric_timestamp_ns = dp["timeUnixNano"].to_i
end
elsif metric["gauge"] && metric["gauge"]["dataPoints"]
metric["gauge"]["dataPoints"].each do |dp|
metric_value = dp["asInt"] || dp["asDouble"]
metric_timestamp_ns = dp["timeUnixNano"].to_i
end
elsif metric["histogram"] && metric["histogram"]["dataPoints"]
metric["histogram"]["dataPoints"].each do |dp|
metric_value = dp["sum"]
metric_timestamp_ns = dp["timeUnixNano"].to_i
end
end
metric_timestamp_ms = metric_timestamp_ns / 1_000_000
extracted_metrics << {
"metric_name" => metric_name,
"metric_description" => metric_description,
"metric_unit" => metric_unit,
"metric_value" => metric_value.to_f,
"metric_timestamp" => metric_timestamp_ms
}
end
end
end
end
end
end
event.set("service_name", service_name) if service_name
event.tag("otel_trace") if otel_trace
event.tag("otel_metric") if otel_metric
event.set("metrics", extracted_metrics) unless extracted_metrics.empty?
'
}
# Keep only required fields and remove unnecessary ones
mutate {
remove_field => ["resourceSpans","resourceMetrics", "@version", "host", "trace_id", "span_id", "parent_span_id", "flags", "kind"]
}
# Rename HTTP request fields correctly
mutate {
rename => {
"[http][request][body][bytes]" => "content_length"
"[http][request][mime_type]" => "mime_type"
}
}
}
output {
# Print logs only if they are OTEL traces
if "otel_metric" in [tags] {
stdout { codec => rubydebug }
}
# Store traces in Elasticsearch
if "otel_trace" in [tags] {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "otel-traces-%{+YYYY.MM.dd}"
user => "elastic"
password => "my-password"
}
}
# Store metrics in Elasticsearch
if "otel_metric" in [tags] {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "otel-metrics-%{+YYYY.MM.dd}"
user => "elastic"
password => "my-password"
}
}
}
Traces are working well, but metrics are not being stored and im facing an issue. Does someone have any code suggestions on how to store the metrics ? any template for logstash.conf please would be appreciated!