Python ZOPE Application Framework Integration to Elastic APM

@lwintergerst @Bansriyar @ravindra ramnanani @rajesh @rajesh.balakrishnan@elastic.co

  1. Usecase – We are integrating the APM soln to our Product Ecosystem which involved Python, Messaging systems & DB. We have achieved 50% of our integration and need to have a full fledged APM to have Observability on all layers of our tech stack (Starting from Request Tracing from Front End to Till DB back to Client E nd to End). Require best practices which includes “How To” aforesaid need.

  2. We are using PYTHON ZOPE APPLICATION framework. In Zope - publisher.py , we are injecting custom instrumentation elastic code which tracks the requests and provides the waterfall model (spans and traces). The problem am facing is "the transactions in the waterfall model shows like "Client.gets" in all the calls which makes hard to understand.


    When we try to integrate (instrument) Python code base which uses Zope framework, observed the water fall model with “Client.gets” , how dow make it more informative with the name of actual method/call.

Hello, thanks so much for reaching out here!
Is it possible for you to share a snippet of your code with what you did so far?

It will depend a bit on what you have so far, but perhaps this will work:

import elasticapm

elasticapm.set_transaction_name('myapp.billing_process')

This sets the name of the currently active transaction to a custom value

hi @lwintergerst ,

Thanks for your reply. Appreciate your help !!

Here is the Code Snippet where am trying to INJECT ELASTIC CODE into my Application (Custom Instrumentation)



###############################################################
# Copyright (c) 2002 **Zope Foundation** and Contributors.
###############################################################
"""Python Object Publisher -- Publish Python objects on web servers
"""
import os
import sys
from thread import allocate_lock
import transaction
from urlparse import urlparse

from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from six import reraise
from zExceptions import Redirect
from zope.event import notify
from zope.globalrequest import setRequest, clearRequest
from zope.publisher.interfaces import ISkinnable
from zope.publisher.interfaces.browser import IBrowserPage
from zope.publisher.skinnable import setDefaultSkin
from zope.security.management import newInteraction, endInteraction

from ZPublisher.mapply import mapply
from ZPublisher import pubevents
from ZPublisher import Retry
from ZPublisher.HTTPRequest import HTTPRequest as Request
from ZPublisher.HTTPResponse import HTTPResponse as Response
from ZPublisher.utils import recordMetaData

####### ELASTIC APM INTEGRATION #######
from elasticapm import Client
import elasticapm
#import elasticapm.instrumentation.control
#from elasticapm.base import Client
#from elasticapm.conf import constants, setup_logging
#from elasticapm.handlers.logging import LoggingHandler
#from elasticapm.traces import execution_context
#from elasticapm.utils.disttracing import TraceParent
#from elasticapm.utils.logging import get_logger

#logger = get_logger("elasticapm.errors.client")

client = elasticapm.Client(service_name="Zope-POCWQAAPM51-5003-Aug25", framework_name="Zope", framework_version="4.0", server_url="http://172.27.2.33:8200")
#client = elasticapm.Client(service_name="Zope-POCWQAAPM51-5003-maapply", framework_name="Zope", framework_version="4.0", server_url="http://172.27.2.33:8200")
elasticapm.instrumentation.control.instrument()

_default_debug_mode = False
_default_realm = None


def call_object(object, args, request):
    return object(*args)


def missing_name(name, request):
    if name == 'self':
        return request['PARENTS'][0]
    request.response.badRequestError(name)


def dont_publish_class(klass, request):
    request.response.forbiddenError("class %s" % klass.__name__)


def validate_user(request, user):
    newSecurityManager(request, user)


def set_default_debug_mode(debug_mode):
    global _default_debug_mode
    _default_debug_mode = debug_mode


def set_default_authentication_realm(realm):
    global _default_realm
    _default_realm = realm

@elasticapm.capture_span()
def publish(request, module_name, after_list, debug=0,
            # Optimize:
            call_object=call_object,
            missing_name=missing_name,
            dont_publish_class=dont_publish_class,
            mapply=mapply,
            ):

    (bobo_before, bobo_after, object, realm, debug_mode, err_hook,
     validated_hook, transactions_manager) = get_module_info(module_name)

    parents = None
    response = None
    
    #client.begin_transaction('request_mapply')	
    try:
        notify(pubevents.PubStart(request))
        # TODO pass request here once BaseRequest implements IParticipation
        newInteraction()

        request.processInputs()

        request_get = request.get
        response = request.response

        # First check for "cancel" redirect:
        if request_get('SUBMIT', '').strip().lower() == 'cancel':
            cancel = request_get('CANCEL_ACTION', '')
            if cancel:
                # Relative URLs aren't part of the spec, but are accepted by
                # some browsers.
                for part, base in zip(urlparse(cancel)[:3],
                                      urlparse(request['BASE1'])[:3]):
                    if not part:
                        continue
                    if not part.startswith(base):
                        cancel = ''
                        break
            if cancel:
                raise Redirect(cancel)

        after_list[0] = bobo_after
        if debug_mode:
            response.debug_mode = debug_mode
        if realm and not request.get('REMOTE_USER', None):
            response.realm = realm

        noSecurityManager()
        if bobo_before is not None:
            bobo_before()

        # Get the path list.
        # According to RFC1738 a trailing space in the path is valid.
        path = request_get('PATH_INFO')

        request['PARENTS'] = parents = [object]

        if transactions_manager:
            transactions_manager.begin()

        object = request.traverse(path, validated_hook=validated_hook)

        if IBrowserPage.providedBy(object):
            request.postProcessInputs()

        notify(pubevents.PubAfterTraversal(request))

        if transactions_manager:
            recordMetaData(object, request)
        #Elastic Code -  Client txn starts
        client.begin_transaction('request_mapply')
        result = mapply(object, request.args, request,
                        call_object, 1,
                        missing_name,
                        dont_publish_class,
                        request, bind=1)
        #import pdb; pdb.set_trace()
		#Elastic Code -  Client txn ends
        client.end_transaction(request.method + " " + request.get('PATH_INFO'), response.status)
        if result is not response:
            response.setBody(result)

        notify(pubevents.PubBeforeCommit(request))
        client.end_transaction(request.method + " " + request.get('PATH_INFO'), response.status)

        if transactions_manager:
            transactions_manager.commit()

        notify(pubevents.PubSuccess(request))
        endInteraction()
        
        #client.end_transaction(request.method + " " + request.get('PATH_INFO'), response.status)
        return response
    except:
        # save in order to give 'PubFailure' the original exception info
        exc_info = sys.exc_info()
        # DM: provide nicer error message for FTP
        sm = None
        if response is not None:
            sm = getattr(response, "setMessage", None)

        if sm is not None:
            from asyncore import compact_traceback
            cl, val = sys.exc_info()[:2]
            sm('%s: %s %s' % (
                getattr(cl, '__name__', cl), val,
                debug_mode and compact_traceback()[-1] or ''))

        # debug is just used by tests (has nothing to do with debug_mode!)
        if not debug and err_hook is not None:
            retry = False
            if parents:
                parents = parents[0]
            try:
                try:
                    return err_hook(parents, request,
                                    sys.exc_info()[0],
                                    sys.exc_info()[1],
                                    sys.exc_info()[2],
                                    )
                except Retry:
                    if not request.supports_retry():
                        return err_hook(parents, request,
                                        sys.exc_info()[0],
                                        sys.exc_info()[1],
                                        sys.exc_info()[2],
                                        )
                    retry = True
            finally:
                # Note: 'abort's can fail.
                # Nevertheless, we want end request handling.
                try:
                    try:
                        notify(pubevents.PubBeforeAbort(
                            request, exc_info, retry))
                    finally:
                        if transactions_manager:
                            transactions_manager.abort()
                finally:
                    endInteraction()
                    notify(pubevents.PubFailure(request, exc_info, retry))

            # Only reachable if Retry is raised and request supports retry.
            newrequest = request.retry()
            request.close()  # Free resources held by the request.

            # Set the default layer/skin on the newly generated request
            if ISkinnable.providedBy(newrequest):
                setDefaultSkin(newrequest)
            try:
                return publish(newrequest, module_name, after_list, debug)
            finally:
                newrequest.close()

        else:
            # Note: 'abort's can fail.
            # Nevertheless, we want end request handling.
            try:
                try:
                    notify(pubevents.PubBeforeAbort(request, exc_info, False))
                finally:
                    if transactions_manager:
                        transactions_manager.abort()
            finally:
                endInteraction()
                notify(pubevents.PubFailure(request, exc_info, False))
            raise

#@elasticapm.capture_span()
def publish_module_standard(
        module_name,
        stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr,
        environ=os.environ, debug=0, request=None, response=None):
    #client.begin_transaction('request')
    must_die = 0
    status = 200
    after_list = [None]
    try:
        try:
            if response is None:
                response = Response(stdout=stdout, stderr=stderr)
            else:
                stdout = response.stdout

            # debug is just used by tests (has nothing to do with debug_mode!)
            response.handle_errors = not debug

            if request is None:
                request = Request(stdin, environ, response)

            setRequest(request)

            # make sure that the request we hand over has the
            # default layer/skin set on it; subsequent code that
            # wants to look up views will likely depend on it
            if ISkinnable.providedBy(request):
                setDefaultSkin(request)
				
            #traceparent_string = elasticapm.get_trace_parent_header()
            #parent = elasticapm.trace_parent_from_headers(traceparent_string)
            #with elasticapm.capture_span('jiva_span', labels={"type": "jiva_publish"}):
            #client.begin_transaction('request')
            #client.begin_transaction(transaction_type="request", trace_parent=parent)
            response = publish(request, module_name, after_list, debug=debug)
            #import pdb; pdb.set_trace()
            #elasticapm.set_context(lambda: get_data_from_request(request), "request")
            #elasticapm.set_context(lambda: get_data_from_response(response), "response")
            #client.end_transaction(request.method + " " + request.get('PATH_INFO'), response.status)
        except (SystemExit, ImportError):
            # XXX: Rendered ImportErrors were never caught here because they
            # were re-raised as string exceptions. Maybe we should handle
            # ImportErrors like all other exceptions. Currently they are not
            # re-raised at all, so they don't show up here.
            must_die = sys.exc_info()
            request.response.exception(1)
        except:
            # debug is just used by tests (has nothing to do with debug_mode!)
            if debug:
                raise
            request.response.exception()
            status = response.getStatus()

        if response:
            outputBody = getattr(response, 'outputBody', None)
            if outputBody is not None:
                outputBody()
            else:
                response = str(response)
                if response:
                    stdout.write(response)

        # The module defined a post-access function, call it
        if after_list[0] is not None:
            after_list[0]()

    finally:
        if request is not None:
            request.close()
            clearRequest()

    if must_die:
        # Try to turn exception value into an exit code.
        try:
            if hasattr(must_die[1], 'code'):
                code = must_die[1].code
            else:
                code = int(must_die[1])
        except:
            code = must_die[1] and 1 or 0
        if hasattr(request.response, '_requestShutdown'):
            request.response._requestShutdown(code)

        try:
            reraise(must_die[0], must_die[1], must_die[2])
        finally:
            must_die = None
			
    return status


_l = allocate_lock()

#@elasticapm.capture_span()
def get_module_info(module_name, modules={},
                    acquire=_l.acquire,
                    release=_l.release):

    if module_name in modules:
        return modules[module_name]

    if module_name[-4:] == '.cgi':
        module_name = module_name[:-4]

    acquire()
    tb = None
    g = globals()
    try:
        try:
            module = __import__(module_name, g, g, ('__doc__',))

            # Let the app specify a realm
            if hasattr(module, '__bobo_realm__'):
                realm = module.__bobo_realm__
            elif _default_realm is not None:
                realm = _default_realm
            else:
                realm = module_name

            # Check for debug mode
            if hasattr(module, '__bobo_debug_mode__'):
                debug_mode = bool(module.__bobo_debug_mode__)
            else:
                debug_mode = _default_debug_mode

            bobo_before = getattr(module, "__bobo_before__", None)
            bobo_after = getattr(module, "__bobo_after__", None)

            if hasattr(module, 'bobo_application'):
                object = module.bobo_application
            elif hasattr(module, 'web_objects'):
                object = module.web_objects
            else:
                object = module

            error_hook = getattr(module, 'zpublisher_exception_hook', None)
            validated_hook = getattr(
                module, 'zpublisher_validated_hook', validate_user)

            transactions_manager = getattr(
                module, 'zpublisher_transactions_manager', None)
            if not transactions_manager:
                # Create a default transactions manager for use
                # by software that uses ZPublisher and ZODB but
                # not the rest of Zope.
                transactions_manager = DefaultTransactionsManager()

            info = (bobo_before, bobo_after, object, realm, debug_mode,
                    error_hook, validated_hook, transactions_manager)

            modules[module_name] = modules[module_name + '.cgi'] = info

            return info
        except Exception:
            t, v, tb = sys.exc_info()
            reraise(t, str(v), tb)
    finally:
        tb = None
        release()


class DefaultTransactionsManager(object):

    def begin(self):
        transaction.begin()

    def commit(self):
        if transaction.isDoomed():
            transaction.abort()
        else:
            transaction.commit()

    def abort(self):
        transaction.abort()


def publish_module(module_name,
                   stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr,
                   environ=os.environ, debug=0, request=None, response=None):
    """ publish a Python module """
    return publish_module_standard(module_name, stdin, stdout, stderr,
                                   environ, debug, request, response)
Code Snippets used to Integrate ELASTIC APM with ZOPE Application in PYTHON : 

####### ELASTIC APM INTEGRATION #######
from elasticapm import Client
import elasticapm
#import elasticapm.instrumentation.control
#from elasticapm.base import Client
#from elasticapm.conf import constants, setup_logging
#from elasticapm.handlers.logging import LoggingHandler
#from elasticapm.traces import execution_context
#from elasticapm.utils.disttracing import TraceParent
#from elasticapm.utils.logging import get_logger

#logger = get_logger("elasticapm.errors.client")

client = elasticapm.Client(service_name="Zope-POCWQAAPM51-5003-Aug25", framework_name="Zope", framework_version="4.0", server_url="http://172.27.2.33:8200")
#client = elasticapm.Client(service_name="Zope-POCWQAAPM51-5003-maapply", framework_name="Zope", framework_version="4.0", server_url="http://172.27.2.33:8200")
elasticapm.instrumentation.control.instrument()

------------------------------------------------------------------------------------------
@elasticapm.capture_span()
def publish(request, module_name, after_list, debug=0,
            # Optimize:

------------------------------------------------------------------------------------------

#Elastic Code -  Client txn starts
        client.begin_transaction('request_mapply')
        result = mapply(object, request.args, request,
                        call_object, 1,
                        missing_name,
                        dont_publish_class,
                        request, bind=1)
        #import pdb; pdb.set_trace()
		#Elastic Code -  Client txn ends
        client.end_transaction(request.method + " " + request.get('PATH_INFO'), response.status)

@lwintergerst @Bansriyar @ravindra ramnanani @rajesh @rajesh.balakrishnan@elastic.co

Appreciate your help here !!

thanks, this is helpful.
Could you do one last thing for me and share a full screenshot of the Transaction?
Your screenshot in your post only shows the Client.get spans, but I would like to better understand where those are being captured.

What I'm looking for is a complete view like this:
Feel free to remove any confidential information if there is any

hi @lwintergerst ,

Thanks for your reply !! Appreciate for helping me.

Here you go the TRANSCATION SCREENSHOT > "Your screenshot in your post only shows the Client.get spans, but I would like to better understand where those are being captured.

What I'm looking for is a complete view"

Please refer these screenshots as this is a full view.

Let me know @lwintergerst any other clarifications you need.

Thank You
Dixit

Hi @lwintergerst ,

One more SAMPLE

Looking for your HELP !!

Thanks ,
Dixit

Great, thanks!

I looks like all of those spans are actually being created by the memcache instrumentation Supported Technologies | APM Python Agent Reference [6.x] | Elastic

Instead of renaming those (which would require you to tweak our auto instrumentation of them, which is not ideal), I suggest that you wrap the calls in another span.

Looking at the ZOPE docs, it seems like you might be using one of these modules too?

If that's the case, then I suggest identifying the place in your code where the session store is used, and wrapping parts of it into a capture span block

with elasticapm.capture_span('name123'):

and as I mentioned, not that I recommend it, but you could tweak the span names in the package here :

Even database calls are not wired up so want to know how to do?

hi @lwintergerst ,

Thanks for your reply, really helping me to go further.

Here I have couple of questions based on your suggestion -

  1. I looks like all of those spans are actually being created by the memcache instrumentation Supported Technologies | APM Python Agent Reference [6.x] | Elastic --- DONE , understood

  2. Don't want to go beyond Ideal recommendation, shall not change SPAN NAMES as suggested by you, thank you.

  3. Looking at the ZOPE docs, it seems like you might be using one of these modules too?

zopefoundation/Zope/blob/master/docs/zopebook/Sessions.rst#use-of-an-alternative-session-server

----- We are not using above mentioned module , we are using PUBLISH.py which is pasted above.

  1. We have problem wiring up DATABASE _ SQL CALLS with APPLICATION Call using ZOPE. Right now, am using mxodbc.py file to TRACK separately the Database SQL Calls.

How do I wire it up DB SQL Calls to Application Calls ?
Screen Shot :

MXODBC.py FILE

# connectors/mxodbc.py
# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php

"""
Provide a SQLALchemy connector for the eGenix mxODBC commercial
Python adapter for ODBC. This is not a free product, but eGenix
provides SQLAlchemy with a license for use in continuous integration
testing.

This has been tested for use with mxODBC 3.1.2 on SQL Server 2005
and 2008, using the SQL Server Native driver. However, it is
possible for this to be used on other database platforms.

For more info on mxODBC, see http://www.egenix.com/

"""

import re
import sys
import warnings

from . import Connector

####### ELASTIC APM INTEGRATION #######
import elasticapm
#import elasticapm.instrumentation.control
#from elasticapm.base import Client
#from elasticapm.conf import constants, setup_logging
#from elasticapm.handlers.logging import LoggingHandler
#from elasticapm.traces import execution_context
#from elasticapm.utils.disttracing import TraceParent
#from elasticapm.utils.logging import get_logger

#logger = get_logger("elasticapm.errors.client")

#client = elasticapm.Client(service_name="Zope-172-27-2-33-5003-today", framework_name="Zope", framework_version="4.0", server_url="http://172.24.244.213:8200")
client = elasticapm.Client(service_name="Zope-POCWQAAPM51-5003-Sep28-DB-SQLAL", framework_name="Zope", framework_version="4.0", server_url="http://172.27.2.33:8200")
elasticapm.instrument()

class MxODBCConnector(Connector):
    driver = "mxodbc"

    supports_sane_multi_rowcount = False
    supports_unicode_statements = True
    supports_unicode_binds = True

    supports_native_decimal = True

    @classmethod
    def dbapi(cls):
        # this classmethod will normally be replaced by an instance
        # attribute of the same name, so this is normally only called once.
        cls._load_mx_exceptions()
        platform = sys.platform
        if platform == "win32":
            from mx.ODBC import Windows as Module
        # this can be the string "linux2", and possibly others
        elif "linux" in platform:
            from mx.ODBC import unixODBC as Module
        elif platform == "darwin":
            from mx.ODBC import iODBC as Module
        else:
            raise ImportError("Unrecognized platform for mxODBC import")
        return Module

    @classmethod
    def _load_mx_exceptions(cls):
        """ Import mxODBC exception classes into the module namespace,
        as if they had been imported normally. This is done here
        to avoid requiring all SQLAlchemy users to install mxODBC.
        """
        global InterfaceError, ProgrammingError
        from mx.ODBC import InterfaceError
        from mx.ODBC import ProgrammingError

    def on_connect(self):
        def connect(conn):
            conn.stringformat = self.dbapi.MIXED_STRINGFORMAT
            conn.datetimeformat = self.dbapi.PYDATETIME_DATETIMEFORMAT
            conn.decimalformat = self.dbapi.DECIMAL_DECIMALFORMAT
            conn.errorhandler = self._error_handler()

        return connect

    def _error_handler(self):
        """ Return a handler that adjusts mxODBC's raised Warnings to
        emit Python standard warnings.
        """
        from mx.ODBC.Error import Warning as MxOdbcWarning

        def error_handler(connection, cursor, errorclass, errorvalue):
            if issubclass(errorclass, MxOdbcWarning):
                errorclass.__bases__ = (Warning,)
                warnings.warn(
                    message=str(errorvalue), category=errorclass, stacklevel=2
                )
            else:
                raise errorclass(errorvalue)

        return error_handler

    def create_connect_args(self, url):
        r"""Return a tuple of \*args, \**kwargs for creating a connection.

        The mxODBC 3.x connection constructor looks like this:

            connect(dsn, user='', password='',
                    clear_auto_commit=1, errorhandler=None)

        This method translates the values in the provided uri
        into args and kwargs needed to instantiate an mxODBC Connection.

        The arg 'errorhandler' is not used by SQLAlchemy and will
        not be populated.

        """
        opts = url.translate_connect_args(username="user")
        opts.update(url.query)
        args = opts.pop("host")
        opts.pop("port", None)
        opts.pop("database", None)
        return (args,), opts

    def is_disconnect(self, e, connection, cursor):
        # TODO: eGenix recommends checking connection.closed here
        # Does that detect dropped connections ?
        if isinstance(e, self.dbapi.ProgrammingError):
            return "connection already closed" in str(e)
        elif isinstance(e, self.dbapi.Error):
            return "[08S01]" in str(e)
        else:
            return False

    def _get_server_version_info(self, connection):
        # eGenix suggests using conn.dbms_version instead
        # of what we're doing here
        dbapi_con = connection.connection
        version = []
        r = re.compile(r"[.\-]")
        # 18 == pyodbc.SQL_DBMS_VER
        for n in r.split(dbapi_con.getinfo(18)[1]):
            try:
                version.append(int(n))
            except ValueError:
                version.append(n)
        return tuple(version)

    def _get_direct(self, context):
        if context:
            native_odbc_execute = context.execution_options.get(
                "native_odbc_execute", "auto"
            )
            # default to direct=True in all cases, is more generally
            # compatible especially with SQL Server
            return False if native_odbc_execute is True else True
        else:
            return True

    def do_executemany(self, cursor, statement, parameters, context=None):
        cursor.executemany(
            statement, parameters, direct=self._get_direct(context)
        )
    
    #@elasticapm.capture_span()
    def do_execute(self, cursor, statement, parameters, context=None):
        client.begin_transaction('SQL_MXodbc_query')
        cursor.execute(statement, parameters, direct=self._get_direct(context))
        client.end_transaction("MXODBC_"+statement, "Success")

hi @lwintergerst ,

Appreciate your help as your guidance is getting me closer to success.

Need little more help !! Appreciate your help !!!

Once POC is complete I can propose my ORG to have Elastic APM to be part of my Organisation.

Thank you @lwintergerst

Thanks,
Dixit

Thanks for the additional info again.

Can you try replacing the last lines in your file with this:
Instead of starting new transactions what you actually want are additional spans here.

    def do_execute(self, cursor, statement, parameters, context=None):
        #client.begin_transaction('SQL_MXodbc_query')
        with elasticapm.capture_span(statement, span_type='db', span_subtype='sql', span_action='query'):
          cursor.execute(statement, parameters, direct=self._get_direct(context))
        #client.end_transaction("MXODBC_"+statement, "Success")

hi @lwintergerst ,

Works good Wow !!, thanks so much !!

  1. What should be the statement here as a best practice - client = PUBLISH.PY - elasticapm.Client(service_name="Zope-POCWQAAPM51-5003-Sep30-DB-SQLAL", framework_name="Zope", framework_version="4.0", server_url="http://172.27.2.33:8200")

  2. Do I need to put the statement of similar kind in MXODBC.PY - Database Driver Code -
    client = elasticapm.Client(service_name="Zope-POCWQAAPM51-5003-Sep30-DB-SQLAL", framework_name="Zope", framework_version="4.0", server_url="http://172.27.2.33:8200")

  3. Renaming the "Client.gets" would it help me to trace what kind of "query fired" for PyMEMCACHE ?

  4. I want to reformat or brig it to standards my INTEGRATION , now we have Elastic Code everywhere , please suggest some best practices ?

These are some last questions I had . I think we are better following your suggestions . THANK YOU @lwintergerst

Hi @lwintergerst ,

These statements are needed, I want to know some the best practices where some minimal set of code from Elastic APM can be used , right now am so much disoriented reading searching in google etc... below statements what are good to use and what are unwanted.


####### ELASTIC APM INTEGRATION #######
from elasticapm import Client
import elasticapm
#import elasticapm.instrumentation.control
#from elasticapm.base import Client
#from elasticapm.conf import constants, setup_logging
#from elasticapm.handlers.logging import LoggingHandler
#from elasticapm.traces import execution_context
#from elasticapm.utils.disttracing import TraceParent
#from elasticapm.utils.logging import get_logger

#logger = get_logger("elasticapm.errors.client")

client = elasticapm.Client(service_name="Zope-POCWQAAPM51-5003-Sep30-DB-SQLAL", framework_name="Zope", framework_version="4.0", server_url="http://172.27.2.33:8200")
#client = elasticapm.Client(service_name="Zope-POCWQAAPM51-5003-maapply", framework_name="Zope", framework_version="4.0", server_url="http://172.27.2.33:8200")
elasticapm.instrumentation.control.instrument()
elasticapm.instrument()

Thank You @lwintergerst

You should only create the client once in your app. Creating a Client instance automatically sets the global instance which you can fetch from anywhere with elasticapm.get_client().

I don't think you actually need any custom code around pymemcache -- we instrument that library here.

At a baseline, you really only need

import elasticapm
client = elasticapm.Client(service_name="Zope-POCWQAAPM51-5003-Sep30-DB-SQLAL", framework_name="Zope", framework_version="4.0", server_url="http://172.27.2.33:8200")
elasticapm.instrument()

At that point you can use that client object to begin and end transactions. And elasticapm.capture_span() can be used anywhere to capture additional custom spans as long as a transaction is active.

Also I should link to this in case you haven't seen it: Instrumenting custom code | APM Python Agent Reference [6.x] | Elastic

Please reply with any remaining questions you have.

hi @basepi and @lwintergerst ,

Thank you for your guidance !! Much Appreciated.

Here are some more questions :slight_smile:

  1. According to documentation is this fine adding - @elasticapm.capture_span()
    and with elasticapm.capture_span('near-to-machine'): in the same method/code .
    Is this OK to add below code with @elasticapm.capture_span() - decorator and with elasticapm.capture_span - context getter
@elasticapm.capture_span()
 def do_execute(self, cursor, statement, parameters, context=None):
        #client.begin_transaction('SQL_MXodbc_query')
        with elasticapm.capture_span(statement, span_type='db', span_subtype='sql', span_action='query'):
          cursor.execute(statement, parameters, direct=self._get_direct(context))
        #client.end_transaction("MXODBC_"+statement, "Success")
  1. How to get QUERY which is used in the Application/ or copy the query ? Right now am unable to copy the query which is present in Request/response Trace (Waterfall model)
    If the query is small , it's ok we can copy but lengthy queries can't able to copy.

  2. How to instrument/sample/peek in confluent KAFKA /Apache Kafka which we use in our Product Landscape where Producer and Consumers are Python Clients.
    Kafka Inside metrics can we fetch ?

Once again @lwintergerst @basepi thank you so much.

Thanks,
Dixit

It seems a bit redundant. Now you'll have two spans per execute. I would probably only bother with the decorator or the context manager in this case.

This is because you're putting the query in the name of the span, which is really not where it belongs.

Instead, stick the statement in db.statement using the extra arg, as shown here for the dbapi2 instrumentation.

We have support for kafka -- it should create transactions when iterating over messages using the Kafka consumer (.poll and .__next__), as well as spans when sending messages.

hi @basepi ,

Please provide the sample with this context so that I can fetch "SQL Query".

Didn't understand fully about -> Instead, stick the statement in db.statement using the extra arg, as shown here for the dbapi2 instrumentation.

 def do_execute(self, cursor, statement, parameters, context=None):
        #client.begin_transaction('SQL_MXodbc_query')
        with elasticapm.capture_span(statement, span_type='db', span_subtype='sql', span_action='query'):
          cursor.execute(statement, parameters, direct=self._get_direct(context))
        #client.end_transaction("MXODBC_"+statement, "Success")

Appreciate your help !!