Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1ef88f8
PYTHON-5745 Consolidate command logging and monitoring into _CommandT…
blink1073 May 27, 2026
94b8af7
Fix unack_write: unconditionally emit success event and remove dead t…
blink1073 May 27, 2026
0d97f3a
Remove _start, _succeed, _fail and start_time from bulk write context…
blink1073 May 27, 2026
16a8c5e
Add prepare_command to bulk write contexts to encapsulate command fie…
blink1073 May 27, 2026
c0b1fbd
Refactor run_operation to use _CommandTelemetry
blink1073 May 28, 2026
c5f8cba
PYTHON-5745 Use _CommandTelemetry in bulk.py and client_bulk.py unack…
blink1073 May 28, 2026
4e64be1
PYTHON-5745 Replace client/publish_event with topology_id/listeners i…
blink1073 May 28, 2026
3ea117e
PYTHON-5745 Refactor _CommandTelemetry to use conn object; simplify n…
blink1073 May 29, 2026
6779d3f
PYTHON-5745 Remove formatting-only changes and MongoClient._topology_id
blink1073 May 29, 2026
d61aed4
PYTHON-5745 Remove extra blank line in mongo_client.py
blink1073 May 29, 2026
627809c
PYTHON-5745 Revert unrelated changes to test files and non-core sourc…
blink1073 May 29, 2026
21737c4
PYTHON-5745 Restore prepare_command methods to bulk write context cla…
blink1073 May 29, 2026
3620f6c
Fix type: ignore comment in tools/synchro.py for unasync import
blink1073 May 29, 2026
dc77a27
Fix typing: suppress unasync import ignore for both installed and mis…
blink1073 May 29, 2026
5fd2528
PYTHON-5745 Add log_events flag to _CommandTelemetry; suppress hello …
blink1073 May 29, 2026
55755db
PYTHON-5745 Remove dead _start, _fail, _succeed methods from bulk wri…
blink1073 May 29, 2026
3c51055
Remove spurious blank lines after module docstrings
blink1073 May 29, 2026
80f01f1
Add local aliases in network.command() to reduce diff vs master
blink1073 May 29, 2026
f813ca8
Move cursor operation telemetry from server.py into AsyncConnection
blink1073 May 29, 2026
56ca358
Clean up AsyncConnection: merge write_command/unack_write, rename run…
blink1073 May 29, 2026
3afc49a
Move cursor/bulk-write telemetry to network.py, centralising _Command…
blink1073 May 30, 2026
7e94258
Note in network.command() docstring to use AsyncConnection.command in…
blink1073 May 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions pymongo/_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
# Copyright 2026-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Internal telemetry helpers for unified logging and APM event publishing."""
from __future__ import annotations

import datetime
import logging
from typing import TYPE_CHECKING, Any, Optional

from pymongo.errors import NotPrimaryError, OperationFailure
from pymongo.logger import (
_COMMAND_LOGGER,
_CommandStatusMessage,
_debug_log,
)
from pymongo.message import _convert_exception

if TYPE_CHECKING:
from pymongo.typings import _DocumentOut


class _CommandTelemetry:
"""Unified context manager for command-level telemetry.

Consolidates debug logging and APM event publishing into a single
context manager, eliminating duplicated telemetry code at each call site.

On entry, publishes the command-started event to all enabled channels.
Call :meth:`handle_succeeded` with the server reply on success.
On exit, if an exception is propagating and the outcome has not already
been recorded, the command-failed event is published automatically.

Usage::

with _CommandTelemetry(
conn=conn,
command_name=name,
database_name=dbname,
spec=spec,
orig=orig,
request_id=request_id,
publish_events=publish,
) as cmd_telemetry:
reply = do_network_call()
cmd_telemetry.handle_succeeded(reply)
# Failures are published automatically in __exit__.
"""

__slots__ = (
"_conn",
"_command_name",
"_database_name",
"_spec",
"_orig",
"_log_events",
"_publish_events",
"_request_id",
"_operation_id",
"_start_time",
"_handled",
)

def __init__(
self,
conn: Any,
command_name: str,
database_name: str,
spec: Any,
orig: Any,
request_id: int,
publish_events: bool = True,
log_events: bool = True,
operation_id: Optional[int] = None,
) -> None:
self._conn = conn
self._command_name = command_name
self._database_name = database_name
self._spec = spec
self._orig = orig
self._request_id = request_id
self._publish_events = publish_events
self._log_events = log_events
self._operation_id = operation_id
self._start_time: Optional[datetime.datetime] = None
self._handled = False

def __enter__(self) -> _CommandTelemetry:
self._start_time = datetime.datetime.now()
if self._log_events and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=self._conn._topology_id,
command=self._spec,
commandName=next(iter(self._spec)),
databaseName=self._database_name,
requestId=self._request_id,
operationId=self._request_id,
driverConnectionId=self._conn.id,
serverConnectionId=self._conn.server_connection_id,
serverHost=self._conn.address[0],
serverPort=self._conn.address[1],
serviceId=self._conn.service_id,
)
if self._publish_events:
self._conn.listeners.publish_command_start(
self._orig,
self._database_name,
self._request_id,
self._conn.address,
self._conn.server_connection_id,
op_id=self._operation_id,
service_id=self._conn.service_id,
)
return self

def handle_succeeded(
self,
reply: Any,
speculative_hello: bool = False,
) -> datetime.timedelta:
"""Publish command-succeeded telemetry and return the elapsed duration.

Must be called explicitly by the caller when the command succeeds.
Sets the internal *handled* flag so that ``__exit__`` does not also
publish a failure event when the ``with`` block exits normally.
"""
assert self._start_time is not None
duration = datetime.datetime.now() - self._start_time
if self._log_events and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=self._conn._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(self._spec)),
databaseName=self._database_name,
requestId=self._request_id,
operationId=self._request_id,
driverConnectionId=self._conn.id,
serverConnectionId=self._conn.server_connection_id,
serverHost=self._conn.address[0],
serverPort=self._conn.address[1],
serviceId=self._conn.service_id,
speculative_authenticate="speculativeAuthenticate" in self._orig,
)
if self._publish_events:
self._conn.listeners.publish_command_success(
duration,
reply,
self._command_name,
self._request_id,
self._conn.address,
self._conn.server_connection_id,
op_id=self._operation_id,
service_id=self._conn.service_id,
speculative_hello=speculative_hello,
database_name=self._database_name,
)
self._handled = True
return duration

def handle_failed(self, exc: BaseException) -> None:
"""Publish command-failed telemetry.

Called automatically by ``__exit__`` when an exception propagates
out of the ``with`` block. May also be called explicitly in cases
where the exception is caught inside the ``with`` block (e.g. when
the caller must not re-raise).
"""
assert self._start_time is not None
duration = datetime.datetime.now() - self._start_time
if isinstance(exc, (NotPrimaryError, OperationFailure)):
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc) # type: ignore[arg-type]
if self._log_events and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=self._conn._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(self._spec)),
databaseName=self._database_name,
requestId=self._request_id,
operationId=self._request_id,
driverConnectionId=self._conn.id,
serverConnectionId=self._conn.server_connection_id,
serverHost=self._conn.address[0],
serverPort=self._conn.address[1],
serviceId=self._conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
if self._publish_events:
self._conn.listeners.publish_command_failure(
duration,
failure,
self._command_name,
self._request_id,
self._conn.address,
self._conn.server_connection_id,
op_id=self._operation_id,
service_id=self._conn.service_id,
database_name=self._database_name,
)
self._handled = True

def __exit__(
self,
exc_type: Optional[type],
exc_val: Optional[BaseException],
exc_tb: Any,
) -> None:
if exc_val is not None and not self._handled:
self.handle_failed(exc_val)
Loading
Loading