Add Transaction.name in Spans / Merge Doucment of span and transaction

Hello Elastic Observability > APM engineers,

I have multiple agents ingerated with multiple type of applications using multiple methods like nodes js agents, python, java and some of the application are ingerated using open telemetry .

I want to have transaction.name in my span doucment instead of just transaction.id,

How to achieve that so i can build a case on it ingest-pipeline transforms Elastic Observability > APM elastic-observability-engineer

Thanks for reaching out, @kishorkumar,

Would using OpenTelmetry's attribute mapping work for you? Here is an example of what this could look like:

from opentelemetry import trace
from opentelemetry.trace import SpanKind

tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("name-of-transaction", kind=SpanKind.SERVER) as span:

Hi @jessgarson,

No i have not tried at opentelemetry as i am looking for one solution to do on all the apm's via elasticsearch itself not on code bases either its ingtegrated opentelementry, nodes js, python or java etc from elasticsearch end not from any code level.

But i will try that on opentelemetry as well but there others aswell how we are going to enable on them.

Thanks, @kishorkumar. Thanks for providing that additional context. From an Elasticsearch prespective, have you considered using an ingest pipeline or transforms for this purpose?

Hi,
I have tried via **Transform ** it is looks impossible as span and transaction in same index but different document.

Yes, In ingest pipleline it was possible via enrich processor and enrichment policy but as we have the datastream and we will use index: apm-traces-* in enrichment_policy and it will through the exception of large data - circuit_breaking_exception

If and we can the month or the day in our index-pattern by as traces-apm-{{year}}-{{month}}-{{day}}-* then only enrichment policy will excute and we might get the results.

Have you ever come across with this kind of use case to get one document which merge the fields of 2 different doucments fields from same datastream index via transform or any method?

Thanks for your follow up, @kishorkumar.

I was thinking about something like this which includes an enrich processor:

PUT _ingest/pipeline/add-transaction-name-to-spans
{
  "processors": [
    {
      "enrich": {
        "field": "transaction.id",
        "policy_name": "transaction-name-policy",
        "target_field": "transaction_enriched",
        "max_matches": 1,
        "ignore_missing": true
      }
    }
  ]
}

Do you hae a code sample you are working with? Also what version are you using?

Yes I have tried Ingest pipeline with enrich processor without max match attribute and to utilize the enrich processor we need to create enrichment policy with that name and when we try to execute that policy via _policy then it will give us the exception of data too large.

That makes sense, @kishorkumar. Have you considered batch processing or using an interval-based approach?

what do you suggest to use the batch processing or an interval-based approach. colud you provide any refference to that.

Elastic Version: 8.11.1

I was considering smaller pipelines for varying time frames. Perhaps something like this:

PUT _enrich/policy/transaction-name-policy-2025-01
{
  "match": {
    "indices": ["apm-traces-2025-01-*"],
    "match_field": "transaction.id",
    "enrich_fields": ["transaction.name"]
  }
}

I have tried that but policy is unable to excule on that too , even i have tried it with the query to bound it to the now-15m

PUT /_enrich/policy/enrich_transaction_name
{
  "match": {
    "indices": ".ds*traces-apm*2025.03.*",
    "match_field": "transaction.id",
    "enrich_fields": [
      "transaction.name"
    ],
    "query": {
      "bool": {
        "must": [
          {
            "term": { 
              "processor.event": "transaction"
            }
          },
          {
            "exists": {
              "field": "transaction.id"
            }
          },
          {
            "range": {
              "@timestamp": {
                "gte": "now-15m",
                "lt":"now"
              }
            }
          }
        ]
      }
    }
  }
}

and when_execute the policy it shows task is complete=false and after some time

{
 "error": {
   "root_cause": [
     {
       "type": "resource_not_found_exception",
       "reason": "task [jg:619029220] isn't running and hasn't stored its results"
     }
   ],
   "type": "resource_not_found_exception",
   "reason": "task [jg:619029220] isn't running and hasn't stored its results"
 },
 "status": 404
}

and when we provide the particualr index it will run but that's not we need becasue we are looking for general solution in the given solution we will face the issue of changing the months every time and even and if go with index base then its also needs to be changed after every rollover.

Thanks, @kishorkumar; I'm going to check in with some coworkers on this one. I'll get back in touch once I hear more.

I'm interested in testing this to get to the bottom of this. I tried testing with smaller samples, but I'd like to know your dataset's size. Please provide further context on how your data is structured and how I could recreate it.

It's standard APM data from 20+ applications, similar to the transactions data on demo.elastic.co. The only difference is that I have a much larger data volume, around 200GB to 400GB per day, without any replicas.

I have a 30-day retention policy, where data older than 14 days moves to the frozen tier.

In total, this means:
14 × 400GB = 5.46TB

The applications include Java, Python, and Node.js. Since this data is highly sensitive, I can't share it, but you can use the given size estimates. The data structure is the same as demo.elastic.co.

1 Like

Thanks, @kishorkumar. I did play around with some of my sample datasets, which I work with often, but I'll continue to explore and test here.

Thanks again for all your follow-up here. I was thinking about this a bit deeper, and I was wondering if you considered using runtime fields like so:

POST apm-traces-*/_search
{
  "runtime_mappings": {
    "transaction.name": {
      "type": "keyword",
      "script": {
        "source": """
          if (doc.containsKey('parent.id')) {
            def tx = params['_source']['transaction']['name'];
            emit(tx);
          }
        """
      }
    }
  },
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "processor.event": "span"
          }
        }
      ]
    }
  },
  "_source": ["transaction.name", "span.name", "trace.id"]
}

I'm a bit confused about how it will work. Specifically:

  • Can a runtime field fetch or reference data (like transactions name) from other documents like span and transaction are two different records not same?
  • will runtime display in cases?
  • I tried running the GET apm-traces*/_search query that you have shared above, But I don't see the transaction name in the returned document.

Also, I'have around 700,000 to 800,000 documents per minute of spans only.

but i don't find any solution to do thorugh the elasticsearch itself.

I'm considering using Logstash with a simple pipeline (Elasticsearch input and filter), but I need help understanding how to properly configure it to handle this scale of data ingestion.

Elastic Stack > Logstash Elastic Observability > APM

Thanks, @kishorkumar. I was thinking of Logstash as well, but I did wonder about the scale of your data. Do you have a code example of what you were considering?

Logstash Version 8.11 and Es is 8.11

Here is my pipeline:

Summary
input {
   elasticsearch {
    hosts => ["" ]
    user => ""
    password => ""
    ssl_enabled => true
    ssl_verification_mode => "none"
    index => ".ds-traces-apm-*"
    query => '{"_source":["span.id","span.name","transaction.id"],"query":{"bool":{"must":[{"exists":{"field":"transaction.id"}},{"term":{"processor.event":"span"}},{"range":{"@timestamp":{"gte":"now-1m","lte":"now"}}}],"must_not":[{"term":{"span.action":"query"}}]}}}'
    schedule => "* * * * *"
    docinfo => true
    #size => 100
    #scroll => "2m"
  }
}
filter {
  if [transaction][id] {
    elasticsearch {
      hosts => ["" ]
      user => ""
      password => ""
      ssl_enabled => true
      ssl_verification_mode => "none"
      index => ".ds-traces-apm-*"
      query => 'processor.event:transaction AND transaction.id:"%{[transaction][id]}"'
      fields =>  { "[transaction][name]" => "[transaction][name]" }
      tag_on_failure => ["_transaction_id_lookup_failure"]
    }
  }
 
  fingerprint {
    source => ["[span][id]", "[transaction][id]"]
    target => "uuid"
    method => "MD5"
    concatenate_sources => true
  }

}
output {
    #stdout { codec => rubydebug }
     elasticsearch {
        document_id => "%{uuid}"
        hosts => ["" ]
        user => ""
        password => ""
        doc_as_upsert => true
        action => update
        index => "transaction_span_enrichment"
        ssl_enabled => true
        ssl_verification_mode => "none"
    }
}

I have now reduced the number of span by filterout records to approximately 350,000 per minute.

Given this ingestion rate 350,000 per minute, I need to estimate the optimal number of Logstash workers, keeping in mind that it should not exceed the number of CPU cores on the server or machine.

In addition, I need guidance on tuning:

  • Batch size
  • Queue size
  • Other related performance parameters

Also, I want to ensure that the JVM heap size (JM) for Logstash is configured correctly — ideally, it should be no less than 4 GB and no more than 8 GB. I don’t consider this to be a very high load, but when filters are applied, processing time may increase, so optimization is necessary.

Finally, I need to match the number of ingested records with what is actually indexed into Elasticsearch via APM, ensuring consistency between what is fetched and what is stored.