diff --git a/docs/index.rst b/docs/index.rst
index 3e53104e..77efd8b3 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -26,6 +26,12 @@ Python client for `YDB `_ — a fault-tolerant distributed SQ
coordination
scheme
+.. toctree::
+ :hidden:
+ :caption: Observability
+
+ opentelemetry
+
.. toctree::
:hidden:
:caption: Reference
@@ -82,7 +88,7 @@ Distributed Coordination
------------------------
The :doc:`coordination` page covers distributed semaphores and leader election. If you
-need to limit concurrent access to a shared resource across multiple processes or hosts,
+need to limit concurrent access to aЗе shared resource across multiple processes or hosts,
this is the service to use.
Schema Management
@@ -103,6 +109,15 @@ use the ``@ydb_retry`` decorator. Skipping this section is a common source of pr
incidents.
+Observability
+-------------
+
+The :doc:`opentelemetry` page explains how to add distributed tracing to your
+application using OpenTelemetry. One call to ``enable_tracing()`` instruments
+query sessions, transactions, and connection pool operations — so you can
+visualize request flow in Jaeger, Grafana, or any OpenTelemetry-compatible backend.
+
+
API Reference
-------------
diff --git a/docs/opentelemetry.rst b/docs/opentelemetry.rst
new file mode 100644
index 00000000..32991b17
--- /dev/null
+++ b/docs/opentelemetry.rst
@@ -0,0 +1,225 @@
+ПрOpenTelemetry Tracing
+=====================
+
+The SDK provides built-in distributed tracing via `OpenTelemetry `_.
+When enabled, key YDB operations — such as session creation, query execution, transaction
+commit/rollback, and driver initialization — produce OpenTelemetry spans. Trace
+context is automatically propagated to the YDB server through gRPC metadata using the
+`W3C Trace Context `_ standard.
+
+Tracing is **zero-cost when disabled**: the SDK uses no-op stubs by default, so there is
+no overhead unless you explicitly opt in.
+
+
+Installation
+------------
+
+OpenTelemetry packages are not included by default. Install the SDK with the
+``opentelemetry`` extra:
+
+.. code-block:: sh
+
+ pip install ydb[opentelemetry]
+
+This pulls in ``opentelemetry-api``. You will also need ``opentelemetry-sdk`` and an
+exporter for your tracing backend, for example:
+
+.. code-block:: sh
+
+ # OTLP/gRPC exporter (works with Jaeger, Tempo, and others)
+ pip install opentelemetry-exporter-otlp-proto-grpc
+
+
+Enabling Tracing
+----------------
+
+Call ``enable_tracing()`` once, **after** configuring your OpenTelemetry tracer provider
+and **before** creating a ``Driver``:
+
+.. code-block:: python
+
+ from opentelemetry import trace
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.sdk.trace.export import BatchSpanProcessor
+ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
+ from opentelemetry.sdk.resources import Resource
+
+ import ydb
+ from ydb.opentelemetry import enable_tracing
+
+ # 1. Set up OpenTelemetry
+ resource = Resource(attributes={"service.name": "my-service"})
+ provider = TracerProvider(resource=resource)
+ provider.add_span_processor(
+ BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
+ )
+ trace.set_tracer_provider(provider)
+
+ # 2. Enable YDB tracing
+ enable_tracing()
+
+ # 3. Use the SDK as usual — spans are created automatically
+ with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver:
+ driver.wait(timeout=5)
+ with ydb.QuerySessionPool(driver) as pool:
+ pool.execute_with_retries("SELECT 1")
+
+ provider.shutdown()
+
+``enable_tracing()`` accepts an optional ``tracer`` argument. If omitted, the SDK
+obtains a tracer named ``"ydb.sdk"`` from the global tracer provider.
+
+
+What Is Instrumented
+--------------------
+
+The following operations produce spans:
+
+.. list-table::
+ :header-rows: 1
+ :widths: 35 20 45
+
+ * - Span Name
+ - Kind
+ - Description
+ * - ``ydb.Driver.Initialize``
+ - INTERNAL
+ - Driver wait / endpoint discovery.
+ * - ``ydb.CreateSession``
+ - CLIENT
+ - Creating a new query session.
+ * - ``ydb.ExecuteQuery``
+ - CLIENT
+ - Executing a query (including ``execute_with_retries``).
+ * - ``ydb.CommitTransaction``
+ - CLIENT
+ - Committing an explicit transaction.
+ * - ``ydb.RollbackTransaction``
+ - CLIENT
+ - Rolling back a transaction.
+
+All spans are nested under the currently active span, so wrapping your application
+logic in a parent span produces a complete trace tree:
+
+.. code-block:: python
+
+ tracer = trace.get_tracer(__name__)
+
+ with tracer.start_as_current_span("handle-request"):
+ pool.execute_with_retries("SELECT 1")
+ # ↳ ydb.CreateSession (if a new session is needed)
+ # ↳ ydb.ExecuteQuery
+
+
+Span Attributes
+---------------
+
+Every YDB span carries these semantic attributes:
+
+.. list-table::
+ :header-rows: 1
+ :widths: 30 70
+
+ * - Attribute
+ - Description
+ * - ``db.system.name``
+ - Always ``"ydb"``.
+ * - ``db.namespace``
+ - Database path (e.g. ``"/local"``).
+ * - ``server.address``
+ - Endpoint host.
+ * - ``server.port``
+ - Endpoint port.
+
+Additional attributes are set when available:
+
+.. list-table::
+ :header-rows: 1
+ :widths: 30 70
+
+ * - Attribute
+ - Description
+ * - ``ydb.session.id``
+ - Session identifier.
+ * - ``ydb.node.id``
+ - YDB node that handled the request.
+ * - ``ydb.tx.id``
+ - Transaction identifier.
+
+On errors, the span also records:
+
+- ``error.type`` — ``"ydb_error"``, ``"transport_error"``, or the Python exception class name.
+- ``db.response.status_code`` — the YDB status code name (e.g. ``"SCHEME_ERROR"``).
+
+
+Trace Context Propagation
+-------------------------
+
+When tracing is enabled, the SDK automatically injects trace context headers into
+every gRPC call to YDB using the globally configured OpenTelemetry propagator
+(``opentelemetry.propagate.inject``). By default, OpenTelemetry uses the
+`W3C Trace Context `_ propagator, which adds
+``traceparent`` and ``tracestate`` headers.
+
+YDB server expects W3C Trace Context headers, so the default propagator configuration
+works out of the box. This allows the server to correlate client spans with
+server-side processing, enabling end-to-end trace visibility across the entire
+request path.
+
+
+Async Usage
+-----------
+
+Tracing works identically with the async driver. Call ``enable_tracing()`` once at
+startup:
+
+.. code-block:: python
+
+ import asyncio
+ import ydb
+ from ydb.opentelemetry import enable_tracing
+
+ enable_tracing()
+
+ async def main():
+ async with ydb.aio.Driver(
+ endpoint="grpc://localhost:2136",
+ database="/local",
+ ) as driver:
+ await driver.wait(timeout=5)
+ async with ydb.aio.QuerySessionPool(driver) as pool:
+ await pool.execute_with_retries("SELECT 1")
+
+ asyncio.run(main())
+
+
+
+Using a Custom Tracer
+---------------------
+
+To use a specific tracer instead of the global one:
+
+.. code-block:: python
+
+ from opentelemetry import trace
+
+ my_tracer = trace.get_tracer("my.custom.tracer")
+ enable_tracing(tracer=my_tracer)
+
+
+Running the Examples
+--------------------
+
+The ``examples/opentelemetry/`` directory contains ready-to-run examples with a Docker
+Compose setup that starts YDB, an OTLP collector, Tempo, Prometheus, and Grafana:
+
+.. code-block:: sh
+
+ cd examples/opentelemetry
+ docker compose -f compose-e2e.yaml up -d
+
+ # Run the example
+ python example.py
+
+Open `http://localhost:3000 `_ (Grafana) to explore the
+collected traces via the Tempo data source.
diff --git a/examples/opentelemetry/compose-e2e.yaml b/examples/opentelemetry/compose-e2e.yaml
new file mode 100644
index 00000000..933d9a38
--- /dev/null
+++ b/examples/opentelemetry/compose-e2e.yaml
@@ -0,0 +1,61 @@
+version: "3.3"
+services:
+ ydb:
+ image: ydbplatform/local-ydb:trunk
+ restart: always
+ hostname: localhost
+ platform: linux/amd64
+ environment:
+ YDB_DEFAULT_LOG_LEVEL: NOTICE
+ GRPC_TLS_PORT: "2135"
+ GRPC_PORT: "2136"
+ MON_PORT: "8765"
+ YDB_USE_IN_MEMORY_PDISKS: "true"
+ command: [ "--config-path", "/ydb_config/ydb-config-with-tracing.yaml" ]
+ ports:
+ - "2135:2135"
+ - "2136:2136"
+ - "8765:8765"
+ volumes:
+ - ./ydb_config:/ydb_config:ro
+
+ otel-collector:
+ image: otel/opentelemetry-collector-contrib:latest
+ command: [ "--config=/etc/otelcol/config.yaml" ]
+ volumes:
+ - ./otel-collector-config.yaml:/etc/otelcol/config.yaml:ro
+ ports:
+ - "4317:4317"
+ - "4318:4318"
+ - "9464:9464"
+ - "13133:13133"
+ - "13317:55679"
+
+ prometheus:
+ image: prom/prometheus:latest
+ volumes:
+ - ./prometheus.yaml:/etc/prometheus/prometheus.yml:ro
+ ports:
+ - "9090:9090"
+ depends_on: [ otel-collector ]
+
+ tempo:
+ image: grafana/tempo:2.4.1
+ command: [ "-config.file=/etc/tempo.yaml" ]
+ volumes:
+ - ./tempo.yaml:/etc/tempo.yaml:ro
+ ports:
+ - "3200:3200"
+ depends_on: [ otel-collector ]
+
+ grafana:
+ image: grafana/grafana:10.4.2
+ environment:
+ GF_AUTH_ANONYMOUS_ENABLED: "true"
+ GF_AUTH_ANONYMOUS_ORG_ROLE: "Admin"
+ volumes:
+ - ./grafana/provisioning:/etc/grafana/provisioning:ro
+ - ./grafana/dashboards:/var/lib/grafana/dashboards:ro
+ ports:
+ - "3000:3000"
+ depends_on: [ prometheus, tempo ]
diff --git a/examples/opentelemetry/example.py b/examples/opentelemetry/example.py
new file mode 100644
index 00000000..d36397c1
--- /dev/null
+++ b/examples/opentelemetry/example.py
@@ -0,0 +1,65 @@
+"""Minimal example: OpenTelemetry tracing for YDB Python SDK."""
+
+import asyncio
+
+from opentelemetry import trace
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor
+from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
+from opentelemetry.sdk.resources import Resource
+
+import ydb
+from ydb.opentelemetry import enable_tracing
+
+resource = Resource(attributes={"service.name": "ydb-example"})
+provider = TracerProvider(resource=resource)
+provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317")))
+trace.set_tracer_provider(provider)
+
+tracer = trace.get_tracer(__name__)
+enable_tracing(tracer)
+
+ENDPOINT = "grpc://localhost:2136"
+DATABASE = "/local"
+
+
+def sync_example():
+ """Sync: session execute and transaction execute + commit."""
+ with ydb.Driver(endpoint=ENDPOINT, database=DATABASE) as driver:
+ driver.wait(timeout=5)
+
+ with ydb.QuerySessionPool(driver) as pool:
+ with tracer.start_as_current_span("sync-example"):
+ pool.execute_with_retries("SELECT 1")
+
+ def tx_callee(session):
+ with session.transaction() as tx:
+ list(tx.execute("SELECT 1"))
+ tx.commit()
+
+ pool.retry_operation_sync(tx_callee)
+
+
+async def async_example():
+ """Async: session execute and transaction execute + commit."""
+ async with ydb.aio.Driver(endpoint=ENDPOINT, database=DATABASE) as driver:
+ await driver.wait(timeout=5)
+
+ async with ydb.aio.QuerySessionPool(driver) as pool:
+ with tracer.start_as_current_span("async-example"):
+ await pool.execute_with_retries("SELECT 1")
+
+ async def tx_callee(session):
+ async with session.transaction() as tx:
+ result = await tx.execute("SELECT 1")
+ async for _ in result:
+ pass
+ await tx.commit()
+
+ await pool.retry_operation_async(tx_callee)
+
+
+sync_example()
+asyncio.run(async_example())
+
+provider.shutdown()
diff --git a/examples/opentelemetry/grafana/dashboards/README.md b/examples/opentelemetry/grafana/dashboards/README.md
new file mode 100644
index 00000000..eb47493a
--- /dev/null
+++ b/examples/opentelemetry/grafana/dashboards/README.md
@@ -0,0 +1,5 @@
+This folder is intentionally left empty.
+
+Grafana is provisioned with Tempo + Prometheus datasources; use **Explore** to search traces.
+
+
diff --git a/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml b/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml
new file mode 100644
index 00000000..5ccefdc1
--- /dev/null
+++ b/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml
@@ -0,0 +1,13 @@
+apiVersion: 1
+
+providers:
+ - name: 'default'
+ orgId: 1
+ folder: ''
+ type: file
+ disableDeletion: true
+ editable: false
+ options:
+ path: /var/lib/grafana/dashboards
+
+
diff --git a/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml b/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml
new file mode 100644
index 00000000..05ba5bd9
--- /dev/null
+++ b/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml
@@ -0,0 +1,22 @@
+apiVersion: 1
+
+datasources:
+ - name: Prometheus
+ type: prometheus
+ access: proxy
+ url: http://prometheus:9090
+ isDefault: true
+ editable: false
+
+ - name: Tempo
+ type: tempo
+ access: proxy
+ url: http://tempo:3200
+ editable: false
+ jsonData:
+ tracesToMetrics:
+ datasourceUid: Prometheus
+ serviceMap:
+ datasourceUid: Prometheus
+
+
diff --git a/examples/opentelemetry/otel-collector-config.yaml b/examples/opentelemetry/otel-collector-config.yaml
new file mode 100644
index 00000000..7f784445
--- /dev/null
+++ b/examples/opentelemetry/otel-collector-config.yaml
@@ -0,0 +1,44 @@
+receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:4317
+ http:
+ endpoint: 0.0.0.0:4318
+
+processors:
+ batch: { }
+
+exporters:
+ prometheus:
+ endpoint: 0.0.0.0:9464
+ resource_to_telemetry_conversion:
+ enabled: true
+
+ otlp/tempo:
+ endpoint: tempo:4317
+ tls:
+ insecure: true
+
+ debug:
+ verbosity: detailed
+
+extensions:
+ health_check:
+ endpoint: 0.0.0.0:13133
+
+ zpages:
+ endpoint: 0.0.0.0:55679
+
+service:
+ extensions: [ health_check, zpages ]
+ pipelines:
+ metrics:
+ receivers: [ otlp ]
+ processors: [ batch ]
+ exporters: [ prometheus ]
+
+ traces:
+ receivers: [ otlp ]
+ processors: [ batch ]
+ exporters: [ otlp/tempo, debug ]
diff --git a/examples/opentelemetry/prometheus.yaml b/examples/opentelemetry/prometheus.yaml
new file mode 100644
index 00000000..64b31821
--- /dev/null
+++ b/examples/opentelemetry/prometheus.yaml
@@ -0,0 +1,7 @@
+global:
+ scrape_interval: 5s
+
+scrape_configs:
+ - job_name: otel-collector
+ static_configs:
+ - targets: ["otel-collector:9464"]
diff --git a/examples/opentelemetry/tempo.yaml b/examples/opentelemetry/tempo.yaml
new file mode 100644
index 00000000..43dbb19c
--- /dev/null
+++ b/examples/opentelemetry/tempo.yaml
@@ -0,0 +1,15 @@
+server:
+ http_listen_port: 3200
+
+distributor:
+ receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:4317
+
+storage:
+ trace:
+ backend: local
+ local:
+ path: /tmp/tempo
diff --git a/examples/opentelemetry/ydb_config/README.md b/examples/opentelemetry/ydb_config/README.md
new file mode 100644
index 00000000..cbffaaba
--- /dev/null
+++ b/examples/opentelemetry/ydb_config/README.md
@@ -0,0 +1,28 @@
+# YDB server-side tracing (OpenTelemetry)
+
+This folder is used to keep a **custom YDB config** that enables server-side OpenTelemetry tracing.
+
+## 1) Export the default config from a running container
+
+If YDB is running as `ydb-local`:
+
+```bash
+docker cp ydb-local:/ydb_data/cluster/kikimr_configs/config.yaml ./ydb_config/ydb-config.yaml
+```
+
+## 2) Enable OpenTelemetry exporter in the config
+
+Edit `ydb-config.yaml` and add the contents of `otel-tracing-snippet.yaml` (usually as a top-level section).
+
+Default OTLP endpoint (inside docker-compose network): `grpc://otel-collector:4317`
+Default service name (so you can find it in Tempo/Grafana): `ydb`
+
+## 3) Run with the overridden config
+
+Restart YDB (the main `compose-e2e.yaml` will automatically use `--config-path` if `ydb-config.yaml` exists):
+
+```bash
+docker-compose -f compose-e2e.yaml up -d --force-recreate ydb
+```
+
+Now you should see additional server-side traces in Tempo/Grafana (service name defaults to `ydb-local` in the snippet).
diff --git a/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml b/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml
new file mode 100644
index 00000000..bd5978d2
--- /dev/null
+++ b/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml
@@ -0,0 +1,26 @@
+tracing_config:
+ backend:
+ opentelemetry:
+ collector_url: grpc://otel-collector:4317
+ service_name: ydb
+ external_throttling:
+ - scope:
+ database: /local
+ max_traces_per_minute: 60
+ max_traces_burst: 3
+ # Highest tracing detail for *sampled* traces (YDB-generated trace-id).
+ # Note: requests with an external `traceparent` are traced at level 13 (Detailed) per YDB docs.
+ sampling:
+ - scope:
+ database: /local
+ fraction: 1
+ level: 15
+ max_traces_per_minute: 1000
+ max_traces_burst: 100
+ uploader:
+ max_exported_spans_per_second: 30
+ max_spans_in_batch: 100
+ max_bytes_in_batch: 10485760 # 10 MiB
+ max_export_requests_inflight: 3
+ max_batch_accumulation_milliseconds: 5000
+ span_export_timeout_seconds: 120
diff --git a/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml b/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml
new file mode 100644
index 00000000..ef93d0e6
--- /dev/null
+++ b/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml
@@ -0,0 +1,349 @@
+actor_system_config:
+ batch_executor: 2
+ executor:
+ - name: System
+ spin_threshold: 0
+ threads: 2
+ type: BASIC
+ - name: User
+ spin_threshold: 0
+ threads: 3
+ type: BASIC
+ - name: Batch
+ spin_threshold: 0
+ threads: 2
+ type: BASIC
+ - name: IO
+ threads: 1
+ time_per_mailbox_micro_secs: 100
+ type: IO
+ - name: IC
+ spin_threshold: 10
+ threads: 1
+ time_per_mailbox_micro_secs: 100
+ type: BASIC
+ io_executor: 3
+ scheduler:
+ progress_threshold: 10000
+ resolution: 1024
+ spin_threshold: 0
+ service_executor:
+ - executor_id: 4
+ service_name: Interconnect
+ sys_executor: 0
+ user_executor: 1
+blob_storage_config:
+ service_set:
+ availability_domains: 1
+ groups:
+ - erasure_species: 0
+ group_generation: 1
+ group_id: 0
+ rings:
+ - fail_domains:
+ - vdisk_locations:
+ - node_id: 1
+ pdisk_guid: 1
+ pdisk_id: 1
+ vdisk_slot_id: 0
+ pdisks:
+ - node_id: 1
+ path: SectorMap:1:64
+ pdisk_category: 0
+ pdisk_guid: 1
+ pdisk_id: 1
+ vdisks:
+ - vdisk_id:
+ domain: 0
+ group_generation: 1
+ group_id: 0
+ ring: 0
+ vdisk: 0
+ vdisk_location:
+ node_id: 1
+ pdisk_guid: 1
+ pdisk_id: 1
+ vdisk_slot_id: 0
+channel_profile_config:
+ profile:
+ - channel:
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ profile_id: 0
+ - channel:
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ - erasure_species: none
+ pdisk_category: 0
+ storage_pool_kind: hdd
+ profile_id: 1
+domains_config:
+ domain:
+ - domain_id: 1
+ name: local
+ storage_pool_types:
+ - kind: hdd
+ pool_config:
+ box_id: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ - kind: hdd1
+ pool_config:
+ box_id: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ - kind: hdd2
+ pool_config:
+ box_id: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ - kind: hdde
+ pool_config:
+ box_id: 1
+ encryption_mode: 1
+ erasure_species: none
+ kind: hdd
+ pdisk_filter:
+ - property:
+ - type: ROT
+ vdisk_kind: Default
+ security_config:
+ default_users:
+ - name: root
+ password: '1234'
+ state_storage:
+ - ring:
+ nto_select: 1
+ ring:
+ - node:
+ - 1
+ use_ring_specific_node_selection: true
+ ssid: 1
+feature_flags:
+ enable_drain_on_shutdown: false
+ enable_mvcc_snapshot_reads: true
+ enable_persistent_query_stats: true
+ enable_public_api_external_blobs: false
+ enable_scheme_transactions_at_scheme_shard: true
+federated_query_config:
+ audit:
+ enabled: false
+ uaconfig:
+ uri: ''
+ checkpoint_coordinator:
+ checkpointing_period_millis: 1000
+ enabled: true
+ max_inflight: 1
+ storage:
+ endpoint: ''
+ common:
+ ids_prefix: pt
+ use_bearer_for_ydb: true
+ control_plane_proxy:
+ enabled: true
+ request_timeout: 30s
+ control_plane_storage:
+ available_binding:
+ - DATA_STREAMS
+ - OBJECT_STORAGE
+ available_connection:
+ - YDB_DATABASE
+ - CLICKHOUSE_CLUSTER
+ - DATA_STREAMS
+ - OBJECT_STORAGE
+ - MONITORING
+ enabled: true
+ storage:
+ endpoint: ''
+ db_pool:
+ enabled: true
+ storage:
+ endpoint: ''
+ enabled: false
+ gateways:
+ dq:
+ default_settings: []
+ enabled: true
+ pq:
+ cluster_mapping: []
+ solomon:
+ cluster_mapping: []
+ nodes_manager:
+ enabled: true
+ pending_fetcher:
+ enabled: true
+ pinger:
+ ping_period: 30s
+ private_api:
+ enabled: true
+ private_proxy:
+ enabled: true
+ resource_manager:
+ enabled: true
+ token_accessor:
+ enabled: true
+grpc_config:
+ ca: /ydb_certs/ca.pem
+ cert: /ydb_certs/cert.pem
+ host: '[::]'
+ key: /ydb_certs/key.pem
+ services:
+ - nbs
+ - legacy
+ - tablet_service
+ - yql
+ - discovery
+ - cms
+ - locking
+ - kesus
+ - pq
+ - pqcd
+ - pqv1
+ - topic
+ - datastreams
+ - scripting
+ - clickhouse_internal
+ - rate_limiter
+ - analytics
+ - export
+ - import
+ - yq
+ - keyvalue
+ - monitoring
+ - auth
+ - query_service
+ - view
+interconnect_config:
+ start_tcp: true
+kafka_proxy_config:
+ enable_kafka_proxy: true
+ listening_port: 9092
+kqpconfig:
+ settings:
+ - name: _ResultRowsLimit
+ value: '1000'
+log_config:
+ default_level: 5
+ entry: []
+ sys_log: false
+nameservice_config:
+ node:
+ - address: ::1
+ host: localhost
+ node_id: 1
+ port: 19001
+ walle_location:
+ body: 1
+ data_center: '1'
+ rack: '1'
+net_classifier_config:
+ cms_config_timeout_seconds: 30
+ net_data_file_path: /ydb_data/netData.tsv
+ updater_config:
+ net_data_update_interval_seconds: 60
+ retry_interval_seconds: 30
+pqcluster_discovery_config:
+ enabled: false
+pqconfig:
+ check_acl: false
+ cluster_table_path: ''
+ clusters_update_timeout_sec: 1
+ enable_proto_source_id_info: true
+ enabled: true
+ max_storage_node_port: 65535
+ meta_cache_timeout_sec: 1
+ quoting_config:
+ enable_quoting: false
+ require_credentials_in_new_protocol: false
+ root: ''
+ topics_are_first_class_citizen: true
+ version_table_path: ''
+sqs_config:
+ enable_dead_letter_queues: true
+ enable_sqs: false
+ force_queue_creation_v2: true
+ force_queue_deletion_v2: true
+ scheme_cache_hard_refresh_time_seconds: 0
+ scheme_cache_soft_refresh_time_seconds: 0
+static_erasure: none
+system_tablets:
+ default_node:
+ - 1
+ flat_schemeshard:
+ - info:
+ tablet_id: 72057594046678944
+ flat_tx_coordinator:
+ - node:
+ - 1
+ tx_allocator:
+ - node:
+ - 1
+ tx_mediator:
+ - node:
+ - 1
+table_service_config:
+ resource_manager:
+ channel_buffer_size: 262144
+ mkql_heavy_program_memory_limit: 1048576
+ mkql_light_program_memory_limit: 65536
+ verbose_memory_limit_exception: true
+ sql_version: 1
+tracing_config:
+ backend:
+ opentelemetry:
+ collector_url: grpc://otel-collector:4317
+ service_name: ydb
+ external_throttling:
+ - scope:
+ database: /local
+ max_traces_per_minute: 1000
+ max_traces_burst: 100
+ sampling:
+ - scope:
+ database: /local
+ fraction: 1.0
+ level: 15
+ max_traces_per_minute: 1000
+# max_traces_burst: 100
+ uploader:
+ max_exported_spans_per_second: 30
+ max_spans_in_batch: 100
+ max_bytes_in_batch: 10485760 # 10 MiB
+ max_export_requests_inflight: 3
+ max_batch_accumulation_milliseconds: 5000
+ span_export_timeout_seconds: 120
diff --git a/pyproject.toml b/pyproject.toml
index 41e7ef6f..0b08f0b2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -41,6 +41,7 @@ module = [
"requests.*",
"ydb.public.api.*",
"contrib.ydb.public.api.*",
+ "opentelemetry.*",
]
ignore_missing_imports = true
diff --git a/setup.py b/setup.py
index 55ce9029..0f850fbf 100644
--- a/setup.py
+++ b/setup.py
@@ -37,5 +37,6 @@
options={"bdist_wheel": {"universal": True}},
extras_require={
"yc": ["yandexcloud", ],
+ "opentelemetry": ["opentelemetry-api>=1.0.0"],
}
)
diff --git a/test-requirements.txt b/test-requirements.txt
index a5b65963..0976ce50 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -43,6 +43,8 @@ sqlalchemy==1.4.26
pylint-protobuf
cython
freezegun>=1.3.0
+opentelemetry-api>=1.0.0
+opentelemetry-sdk>=1.0.0
# pytest-cov
yandexcloud
-e .
diff --git a/tests/tracing/__init__.py b/tests/tracing/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tests/tracing/conftest.py b/tests/tracing/conftest.py
new file mode 100644
index 00000000..94f653b8
--- /dev/null
+++ b/tests/tracing/conftest.py
@@ -0,0 +1,54 @@
+"""Shared fixtures for OpenTelemetry tracing tests.
+
+Sets up an in-memory TracerProvider so that spans created by the SDK
+can be collected and inspected without any external backend.
+"""
+
+import pytest
+
+from opentelemetry import trace
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import SimpleSpanProcessor
+from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
+
+from ydb.opentelemetry.tracing import _registry
+
+_provider = TracerProvider()
+_exporter = InMemorySpanExporter()
+_provider.add_span_processor(SimpleSpanProcessor(_exporter))
+trace.set_tracer_provider(_provider)
+
+
+@pytest.fixture()
+def otel_setup():
+ """Enable SDK tracing, yield the exporter, then restore noop defaults.
+
+ Each test gets a clean exporter (cleared before and after).
+ """
+ import ydb.opentelemetry._plugin as _plugin
+
+ _exporter.clear()
+
+ _plugin._enabled = False
+ _plugin._tracer = None
+
+ from ydb.opentelemetry import enable_tracing
+
+ enable_tracing()
+
+ yield _exporter
+
+ # Restore noop state
+ _registry.set_create_span(None)
+ _registry.set_metadata_hook(None)
+ _plugin._enabled = False
+ _plugin._tracer = None
+ _exporter.clear()
+
+
+class FakeDriverConfig:
+ def __init__(self, endpoint="test_endpoint:1337", database="/test_database"):
+ self.endpoint = endpoint
+ self.database = database
+ self.query_client_settings = None
+ self.tracer = None
diff --git a/tests/tracing/test_tracing_async.py b/tests/tracing/test_tracing_async.py
new file mode 100644
index 00000000..e3744341
--- /dev/null
+++ b/tests/tracing/test_tracing_async.py
@@ -0,0 +1,243 @@
+"""Unit tests for OpenTelemetry tracing — asynchronous SDK operations.
+
+Mirrors the sync tests but exercises the async code paths in ydb.aio.query.
+"""
+
+from opentelemetry.trace import StatusCode, SpanKind
+from ydb.query.transaction import QueryTxStateEnum
+from .conftest import FakeDriverConfig
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import asyncio
+import pytest
+
+
+async def _empty_async_iter():
+ return
+ yield # noqa: makes this an async generator
+
+
+def _get_spans(exporter, name=None):
+ spans = exporter.get_finished_spans()
+ if name is not None:
+ spans = [s for s in spans if s.name == name]
+ return spans
+
+
+def _get_single_span(exporter, name):
+ spans = _get_spans(exporter, name)
+ assert (
+ len(spans) == 1
+ ), f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}"
+ return spans[0]
+
+
+def _make_async_session_mock(driver_config=None):
+ """Create a mock that behaves like an async QuerySession after create()."""
+ cfg = driver_config or FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+
+ session = MagicMock()
+ session._driver = driver
+ session._session_id = "test-session-id"
+ session._node_id = 12345
+ session.session_id = "test-session-id"
+ session.node_id = 12345
+ return session, driver
+
+
+def _make_async_tx(session, driver):
+ """Create a real async QueryTxContext wired to mocked session/driver."""
+ from ydb._grpc.grpcwrapper.ydb_query_public_types import QuerySerializableReadWrite
+ from ydb.aio.query.transaction import QueryTxContext
+
+ tx = QueryTxContext(driver, session, QuerySerializableReadWrite())
+ tx._tx_state._change_state(QueryTxStateEnum.BEGINED)
+ tx._tx_state.tx_id = "test-tx-id"
+ return tx
+
+
+class TestAsyncCreateSessionSpan:
+ @pytest.mark.asyncio
+ async def test_create_session_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.aio.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = None
+ qs._closed = False
+
+ with patch.object(QuerySession, "_create_call", new_callable=AsyncMock):
+ with patch.object(QuerySession, "_attach", new_callable=AsyncMock):
+ await qs.create()
+
+ span = _get_single_span(exporter, "ydb.CreateSession")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+ assert attrs["server.address"] == "test_endpoint"
+ assert attrs["server.port"] == 1337
+
+
+class TestAsyncExecuteQuerySpan:
+ @pytest.mark.asyncio
+ async def test_session_execute_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.aio.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._closed = False
+
+ fake_stream = _empty_async_iter()
+ with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, return_value=fake_stream):
+ result = await qs.execute("SELECT 1;")
+ async for _ in result:
+ pass
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ attrs = dict(span.attributes)
+ assert attrs["ydb.session.id"] == "test-session-id"
+ assert attrs["ydb.node.id"] == 12345
+
+ @pytest.mark.asyncio
+ async def test_tx_execute_emits_span_with_tx_id(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_async_session_mock()
+ tx = _make_async_tx(session, driver)
+
+ fake_stream = _empty_async_iter()
+ with patch.object(type(tx), "_execute_call", new_callable=AsyncMock, return_value=fake_stream):
+ tx._prev_stream = None
+ result = await tx.execute("SELECT 1;")
+ async for _ in result:
+ pass
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ attrs = dict(span.attributes)
+ assert attrs["ydb.tx.id"] == "test-tx-id"
+ assert attrs["ydb.session.id"] == "test-session-id"
+ assert attrs["ydb.node.id"] == 12345
+
+
+class TestAsyncCommitSpan:
+ @pytest.mark.asyncio
+ async def test_commit_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_async_session_mock()
+ tx = _make_async_tx(session, driver)
+
+ with patch.object(type(tx), "_commit_call", new_callable=AsyncMock):
+ await tx.commit()
+
+ span = _get_single_span(exporter, "ydb.Commit")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["ydb.tx.id"] == "test-tx-id"
+ assert attrs["ydb.session.id"] == "test-session-id"
+
+
+class TestAsyncRollbackSpan:
+ @pytest.mark.asyncio
+ async def test_rollback_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_async_session_mock()
+ tx = _make_async_tx(session, driver)
+
+ with patch.object(type(tx), "_rollback_call", new_callable=AsyncMock):
+ await tx.rollback()
+
+ span = _get_single_span(exporter, "ydb.Rollback")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["ydb.tx.id"] == "test-tx-id"
+ assert attrs["ydb.session.id"] == "test-session-id"
+
+
+class TestAsyncErrorHandling:
+ @pytest.mark.asyncio
+ async def test_error_sets_error_status_and_attributes(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb import issues
+
+ exc = issues.SchemeError("Table not found")
+
+ from ydb.aio.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._closed = False
+
+ with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, side_effect=exc):
+ with pytest.raises(issues.SchemeError):
+ await qs.execute("SELECT * FROM non_existing_table")
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "SCHEME_ERROR"
+ assert len(span.events) > 0
+
+
+class TestAsyncConcurrentSpansIsolation:
+ @pytest.mark.asyncio
+ async def test_parallel_executes_do_not_become_parent_child(self, otel_setup):
+ """Two concurrent execute calls must produce sibling spans, not parent-child."""
+ exporter = otel_setup
+
+ from ydb.aio.query.session import QuerySession
+
+ async def _slow_async_iter():
+ await asyncio.sleep(0.5)
+ return
+ yield # noqa
+
+ def _make_session():
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 1
+ qs._closed = False
+ return qs
+
+ async def do_execute(qs):
+ fake_stream = _slow_async_iter()
+ with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, return_value=fake_stream):
+ result = await qs.execute("SELECT 1")
+ async for _ in result:
+ pass
+
+ qs1 = _make_session()
+ qs2 = _make_session()
+ await asyncio.gather(do_execute(qs1), do_execute(qs2))
+
+ spans = _get_spans(exporter, "ydb.ExecuteQuery")
+ assert len(spans) == 2
+
+ ids = {s.context.span_id for s in spans}
+ for s in spans:
+ if s.parent is not None:
+ assert s.parent.span_id not in ids, "Concurrent spans must be siblings, not parent-child"
diff --git a/tests/tracing/test_tracing_sync.py b/tests/tracing/test_tracing_sync.py
new file mode 100644
index 00000000..f149f0c8
--- /dev/null
+++ b/tests/tracing/test_tracing_sync.py
@@ -0,0 +1,287 @@
+"""Unit tests for OpenTelemetry tracing — synchronous SDK operations.
+
+Uses an in-memory span exporter to verify that correct spans, attributes,
+parent-child relationships, and error handling are produced by the SDK.
+No real YDB connection is needed.
+"""
+
+from unittest.mock import MagicMock, patch
+from opentelemetry import trace
+from opentelemetry.trace import StatusCode, SpanKind
+from ydb.opentelemetry.tracing import _registry, create_ydb_span
+from ydb.query.transaction import QueryTxStateEnum
+from .conftest import FakeDriverConfig
+
+import pytest
+
+
+def _get_spans(exporter, name=None):
+ spans = exporter.get_finished_spans()
+ if name is not None:
+ spans = [s for s in spans if s.name == name]
+ return spans
+
+
+def _get_single_span(exporter, name):
+ spans = _get_spans(exporter, name)
+ assert (
+ len(spans) == 1
+ ), f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}"
+ return spans[0]
+
+
+def _make_session_mock(driver_config=None):
+ """Create a mock that behaves like a sync QuerySession after create()."""
+ cfg = driver_config or FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+
+ session = MagicMock()
+ session._driver = driver
+ session._session_id = "test-session-id"
+ session._node_id = 12345
+ session.session_id = "test-session-id"
+ session.node_id = 12345
+ return session, driver
+
+
+def _make_tx(session, driver):
+ """Create a real QueryTxContext wired to mocked session/driver."""
+ from ydb._grpc.grpcwrapper.ydb_query_public_types import QuerySerializableReadWrite
+ from ydb.query.transaction import QueryTxContext
+
+ tx = QueryTxContext(driver, session, QuerySerializableReadWrite())
+ # Simulate that the transaction has been started (so commit/rollback create spans)
+ tx._tx_state._change_state(QueryTxStateEnum.BEGINED)
+ tx._tx_state.tx_id = "test-tx-id"
+ return tx
+
+
+class TestCreateSessionSpan:
+ def test_create_session_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = None
+ qs._closed = False
+
+ with patch.object(QuerySession, "_create_call", return_value=None):
+ with patch.object(QuerySession, "_attach", return_value=None):
+ qs.create()
+
+ span = _get_single_span(exporter, "ydb.CreateSession")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+ assert attrs["server.address"] == "test_endpoint"
+ assert attrs["server.port"] == 1337
+ assert span.status.status_code == StatusCode.UNSET
+
+
+class TestExecuteQuerySpan:
+ def test_session_execute_emits_span(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._closed = False
+
+ fake_stream = iter([]) # empty stream that raises StopIteration immediately
+ with patch.object(QuerySession, "_execute_call", return_value=fake_stream):
+ result = qs.execute("SELECT 1;")
+ # Consume the iterator to finish the span
+ list(result)
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+ assert attrs["server.address"] == "test_endpoint"
+ assert attrs["server.port"] == 1337
+ assert attrs["ydb.session.id"] == "test-session-id"
+ assert attrs["ydb.node.id"] == 12345
+
+ def test_tx_execute_emits_span_with_tx_id(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_session_mock()
+ tx = _make_tx(session, driver)
+
+ fake_stream = iter([])
+ with patch.object(type(tx), "_execute_call", return_value=fake_stream):
+ tx._prev_stream = None
+ result = tx.execute("SELECT 1;")
+ list(result)
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ attrs = dict(span.attributes)
+ assert attrs["ydb.tx.id"] == "test-tx-id"
+ assert attrs["ydb.session.id"] == "test-session-id"
+ assert attrs["ydb.node.id"] == 12345
+
+
+class TestCommitSpan:
+ def test_commit_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_session_mock()
+ tx = _make_tx(session, driver)
+
+ with patch.object(type(tx), "_commit_call", return_value=None):
+ tx.commit()
+
+ span = _get_single_span(exporter, "ydb.Commit")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["ydb.tx.id"] == "test-tx-id"
+ assert attrs["ydb.session.id"] == "test-session-id"
+ assert attrs["ydb.node.id"] == 12345
+
+
+class TestRollbackSpan:
+ def test_rollback_emits_span(self, otel_setup):
+ exporter = otel_setup
+ session, driver = _make_session_mock()
+ tx = _make_tx(session, driver)
+
+ with patch.object(type(tx), "_rollback_call", return_value=None):
+ tx.rollback()
+
+ span = _get_single_span(exporter, "ydb.Rollback")
+ assert span.kind == SpanKind.CLIENT
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["ydb.tx.id"] == "test-tx-id"
+ assert attrs["ydb.session.id"] == "test-session-id"
+ assert attrs["ydb.node.id"] == 12345
+
+
+class TestErrorHandling:
+ def test_error_sets_error_status_and_attributes(self, otel_setup):
+ exporter = otel_setup
+
+ from ydb import issues
+
+ exc = issues.SchemeError("Table not found")
+
+ from ydb.query.session import QuerySession
+
+ qs = QuerySession.__new__(QuerySession)
+ cfg = FakeDriverConfig()
+ driver = MagicMock()
+ driver._driver_config = cfg
+ qs._driver = driver
+ qs._session_id = "test-session-id"
+ qs._node_id = 12345
+ qs._closed = False
+
+ with patch.object(QuerySession, "_execute_call", side_effect=exc):
+ with pytest.raises(issues.SchemeError):
+ qs.execute("SELECT * FROM non_existing_table")
+
+ span = _get_single_span(exporter, "ydb.ExecuteQuery")
+ assert span.status.status_code == StatusCode.ERROR
+ attrs = dict(span.attributes)
+ assert attrs["error.type"] == "ydb_error"
+ assert attrs["db.response.status_code"] == "SCHEME_ERROR"
+ assert len(span.events) > 0
+
+
+class TestNoSpansWhenDisabled:
+ def test_no_spans_without_enable_tracing(self):
+ """Without enable_tracing(), the registry uses noop — no spans are created."""
+
+ from tests.tracing.conftest import _exporter
+
+ _registry.set_create_span(None)
+ _registry.set_metadata_hook(None)
+ _exporter.clear()
+
+ with create_ydb_span("ydb.CreateSession", FakeDriverConfig()):
+ pass
+
+ assert len(_exporter.get_finished_spans()) == 0
+
+
+class TestParentChildRelationship:
+ def test_sdk_span_is_child_of_user_span(self, otel_setup):
+ exporter = otel_setup
+
+ tracer = trace.get_tracer("test.tracer")
+
+ with tracer.start_as_current_span("user.operation"):
+ with create_ydb_span("ydb.ExecuteQuery", FakeDriverConfig(), session_id="s1", node_id=1):
+ pass
+
+ spans = exporter.get_finished_spans()
+ ydb_span = next(s for s in spans if s.name == "ydb.ExecuteQuery")
+ user_span = next(s for s in spans if s.name == "user.operation")
+
+ assert ydb_span.parent is not None
+ assert ydb_span.parent.span_id == user_span.context.span_id
+ assert ydb_span.context.trace_id == user_span.context.trace_id
+
+
+class TestTraceMetadataInjection:
+ def test_get_trace_metadata_returns_traceparent(self, otel_setup):
+ from ydb.opentelemetry.tracing import get_trace_metadata
+
+ tracer = trace.get_tracer("test.tracer")
+
+ with tracer.start_as_current_span("test.span"):
+ metadata = get_trace_metadata()
+
+ keys = [k for k, v in metadata]
+ assert "traceparent" in keys
+
+
+class TestDriverInitializeSpan:
+ def test_driver_initialize_emits_internal_span(self, otel_setup):
+ exporter = otel_setup
+
+ cfg = FakeDriverConfig()
+
+ with create_ydb_span("ydb.Driver.Initialize", cfg, kind="internal"):
+ pass
+
+ span = _get_single_span(exporter, "ydb.Driver.Initialize")
+ assert span.kind == SpanKind.INTERNAL
+ attrs = dict(span.attributes)
+ assert attrs["db.system.name"] == "ydb"
+ assert attrs["db.namespace"] == "/test_database"
+
+
+class TestCommonAttributes:
+ @pytest.mark.parametrize(
+ "endpoint,expected_host,expected_port",
+ [
+ ("grpc://host.example.com:2136", "grpc://host.example.com", 2136),
+ ("localhost:2136", "localhost", 2136),
+ ],
+ )
+ def test_endpoint_parsing(self, otel_setup, endpoint, expected_host, expected_port):
+ exporter = otel_setup
+ cfg = FakeDriverConfig(endpoint=endpoint, database="/mydb")
+
+ with create_ydb_span("ydb.Test", cfg):
+ pass
+
+ span = _get_single_span(exporter, "ydb.Test")
+ attrs = dict(span.attributes)
+ assert attrs["server.address"] == expected_host
+ assert attrs["server.port"] == expected_port
+ assert attrs["db.namespace"] == "/mydb"
diff --git a/ydb/aio/connection.py b/ydb/aio/connection.py
index 9e03450d..a3cf2ffc 100644
--- a/ydb/aio/connection.py
+++ b/ydb/aio/connection.py
@@ -26,6 +26,7 @@
from ydb.driver import DriverConfig
from ydb.settings import BaseRequestSettings
from ydb import issues
+from ydb.opentelemetry.tracing import get_trace_metadata
# Workaround for good IDE and universal for runtime
if TYPE_CHECKING:
@@ -71,6 +72,9 @@ async def _construct_metadata(
metadata.append((YDB_REQUEST_TYPE_HEADER, settings.request_type))
metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ())))
+
+ metadata.extend(get_trace_metadata())
+
return metadata
@@ -157,7 +161,6 @@ def __init__(
driver_config: Optional[DriverConfig] = None,
endpoint_options: Optional[EndpointOptions] = None,
) -> None:
- global _stubs_list
self.endpoint = endpoint
self.endpoint_key = EndpointKey(self.endpoint, getattr(endpoint_options, "node_id", None))
self.node_id = getattr(endpoint_options, "node_id", None)
diff --git a/ydb/aio/pool.py b/ydb/aio/pool.py
index fe709133..4f1b0cdd 100644
--- a/ydb/aio/pool.py
+++ b/ydb/aio/pool.py
@@ -6,6 +6,7 @@
from typing import Any, Callable, Optional, Tuple, TYPE_CHECKING
from ydb import issues
+from ydb.opentelemetry.tracing import create_ydb_span
from ydb.pool import ConnectionsCache as _ConnectionsCache, IConnectionPool
from .connection import Connection, EndpointKey
@@ -285,7 +286,8 @@ async def __wrapper__() -> None:
return __wrapper__
async def wait(self, timeout: Optional[float] = 7.0, fail_fast: bool = False) -> None: # type: ignore[override] # async override of sync method
- await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0)
+ with create_ydb_span("ydb.Driver.Initialize", self._driver_config, kind="internal"):
+ await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0)
def discovery_debug_details(self) -> str:
if self._discovery:
diff --git a/ydb/aio/query/base.py b/ydb/aio/query/base.py
index 66df3703..cbf22e98 100644
--- a/ydb/aio/query/base.py
+++ b/ydb/aio/query/base.py
@@ -2,9 +2,10 @@
class AsyncResponseContextIterator(_utilities.AsyncResponseIterator):
- def __init__(self, it, wrapper, on_error=None):
+ def __init__(self, it, wrapper, on_error=None, span=None):
super().__init__(it, wrapper)
self._on_error = on_error
+ self._span = span
async def __aenter__(self) -> "AsyncResponseContextIterator":
return self
@@ -12,12 +13,27 @@ async def __aenter__(self) -> "AsyncResponseContextIterator":
async def _next(self):
try:
return await super()._next()
+ except StopAsyncIteration:
+ self._finish_span()
+ raise
except Exception as e:
if self._on_error:
self._on_error(e)
+ self._finish_span(e)
raise e
+ def _finish_span(self, exception=None):
+ if self._span is not None:
+ if exception is not None:
+ self._span.set_error(exception)
+ self._span.end()
+ self._span = None
+
+ def __del__(self):
+ self._finish_span()
+
async def __aexit__(self, exc_type, exc_val, exc_tb):
# To close stream on YDB it is necessary to scroll through it to the end
async for _ in self:
pass
+ self._finish_span()
diff --git a/ydb/aio/query/session.py b/ydb/aio/query/session.py
index 67e62ff6..4c8c1c99 100644
--- a/ydb/aio/query/session.py
+++ b/ydb/aio/query/session.py
@@ -19,6 +19,7 @@
from ...query import base
from ...query.session import BaseQuerySession
+from ...opentelemetry.tracing import create_ydb_span
from ..._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT
@@ -105,8 +106,9 @@ async def create(self, settings: Optional[BaseRequestSettings] = None) -> "Query
if self._closed:
raise RuntimeError("Session is already closed")
- await self._create_call(settings=settings)
- await self._attach()
+ with create_ydb_span("ydb.CreateSession", self._driver_config):
+ await self._create_call(settings=settings)
+ await self._attach()
return self
@@ -159,30 +161,41 @@ async def execute(
"""
self._check_session_ready_to_use()
- stream_it = await self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=True,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ "ydb.ExecuteQuery", self._driver_config, session_id=self._session_id, node_id=self._node_id
)
- return AsyncResponseContextIterator(
- it=stream_it,
- wrapper=lambda resp: base.wrap_execute_query_response(
- rpc_state=None,
- response_pb=resp,
- session=self,
- settings=self._settings,
- ),
- on_error=self._on_execute_stream_error,
- )
+ try:
+ stream_it = await self._execute_call(
+ query=query,
+ parameters=parameters,
+ commit_tx=True,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
+
+ return AsyncResponseContextIterator(
+ it=stream_it,
+ wrapper=lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ session=self,
+ settings=self._settings,
+ ),
+ on_error=self._on_execute_stream_error,
+ span=span,
+ )
+ except Exception as e:
+ if span is not None:
+ span.set_error(e)
+ span.end()
+ raise
async def explain(
self,
diff --git a/ydb/aio/query/transaction.py b/ydb/aio/query/transaction.py
index 69c77478..cd764067 100644
--- a/ydb/aio/query/transaction.py
+++ b/ydb/aio/query/transaction.py
@@ -12,6 +12,7 @@
BaseQueryTxContext,
QueryTxStateEnum,
)
+from ...opentelemetry.tracing import create_ydb_span
if TYPE_CHECKING:
from .session import QuerySession
@@ -106,13 +107,20 @@ async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None:
await self._ensure_prev_stream_finished()
- try:
- await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT)
- await self._commit_call(settings)
- await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None)
- except BaseException as e:
- await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e)
- raise e
+ with create_ydb_span(
+ "ydb.Commit",
+ self._driver_config,
+ session_id=self.session.session_id,
+ node_id=self.session.node_id,
+ tx_id=self._tx_state.tx_id,
+ ):
+ try:
+ await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT)
+ await self._commit_call(settings)
+ await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None)
+ except BaseException as e:
+ await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e)
+ raise e
async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None:
"""Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
@@ -133,13 +141,20 @@ async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None
await self._ensure_prev_stream_finished()
- try:
- await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK)
- await self._rollback_call(settings)
- await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None)
- except BaseException as e:
- await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e)
- raise e
+ with create_ydb_span(
+ "ydb.Rollback",
+ self._driver_config,
+ session_id=self.session.session_id,
+ node_id=self.session.node_id,
+ tx_id=self._tx_state.tx_id,
+ ):
+ try:
+ await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK)
+ await self._rollback_call(settings)
+ await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None)
+ except BaseException as e:
+ await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e)
+ raise e
async def execute(
self,
@@ -187,30 +202,45 @@ async def execute(
"""
await self._ensure_prev_stream_finished()
- stream_it = await self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=commit_tx,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ "ydb.ExecuteQuery",
+ self._driver_config,
+ session_id=self.session.session_id,
+ node_id=self.session.node_id,
+ tx_id=self._tx_state.tx_id,
)
- self._prev_stream = AsyncResponseContextIterator(
- it=stream_it,
- wrapper=lambda resp: base.wrap_execute_query_response(
- rpc_state=None,
- response_pb=resp,
- session=self.session,
- tx=self,
+ try:
+ stream_it = await self._execute_call(
+ query=query,
+ parameters=parameters,
commit_tx=commit_tx,
- settings=self.session._settings,
- ),
- on_error=self.session._on_execute_stream_error,
- )
- return self._prev_stream
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
+
+ self._prev_stream = AsyncResponseContextIterator(
+ it=stream_it,
+ wrapper=lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ session=self.session,
+ tx=self,
+ commit_tx=commit_tx,
+ settings=self.session._settings,
+ ),
+ on_error=self.session._on_execute_stream_error,
+ span=span,
+ )
+ return self._prev_stream
+ except Exception as e:
+ if span is not None:
+ span.set_error(e)
+ span.end()
+ raise
diff --git a/ydb/connection.py b/ydb/connection.py
index 98fbd5aa..d1bcfdf5 100644
--- a/ydb/connection.py
+++ b/ydb/connection.py
@@ -24,6 +24,7 @@
import grpc
from . import issues, _apis, _utilities
from . import default_pem
+from .opentelemetry.tracing import get_trace_metadata
_stubs_list = (
_apis.TableService.Stub,
@@ -179,6 +180,9 @@ def _construct_metadata(driver_config, settings):
metadata.extend(getattr(settings, "headers", []))
metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ())))
+
+ metadata.extend(get_trace_metadata())
+
return metadata
@@ -419,7 +423,6 @@ def __init__(
discovered by the YDB endpoint discovery mechanism
:param driver_config: A driver config instance to be used for RPC call interception
"""
- global _stubs_list
self.endpoint = endpoint
self.node_id = getattr(endpoint_options, "node_id", None)
self.endpoint_key = EndpointKey(endpoint, getattr(endpoint_options, "node_id", None))
diff --git a/ydb/opentelemetry/__init__.py b/ydb/opentelemetry/__init__.py
new file mode 100644
index 00000000..1ea6d6c8
--- /dev/null
+++ b/ydb/opentelemetry/__init__.py
@@ -0,0 +1,18 @@
+def enable_tracing(tracer=None):
+ """Enable OpenTelemetry trace context propagation and span creation for all YDB gRPC calls.
+
+ Args:
+ tracer: Optional OTel tracer to use. If not provided, the default tracer from the global tracer provider will be used.
+ """
+ try:
+ from ydb.opentelemetry._plugin import _enable_tracing
+ except ImportError:
+ raise ImportError(
+ "OpenTelemetry packages are required for tracing support. "
+ "Install them with: pip install ydb[opentelemetry]"
+ ) from None
+
+ _enable_tracing(tracer)
+
+
+__all__ = ["enable_tracing"]
diff --git a/ydb/opentelemetry/_plugin.py b/ydb/opentelemetry/_plugin.py
new file mode 100644
index 00000000..e77b532b
--- /dev/null
+++ b/ydb/opentelemetry/_plugin.py
@@ -0,0 +1,97 @@
+from opentelemetry import context, trace
+from opentelemetry.propagate import inject
+from opentelemetry.trace import StatusCode
+
+from ydb import issues
+from ydb.issues import StatusCode as YdbStatusCode
+from ydb.opentelemetry.tracing import _registry
+
+_TRANSPORT_STATUSES = frozenset(
+ {
+ YdbStatusCode.CONNECTION_LOST,
+ YdbStatusCode.CONNECTION_FAILURE,
+ YdbStatusCode.DEADLINE_EXCEEDED,
+ YdbStatusCode.CLIENT_INTERNAL_ERROR,
+ YdbStatusCode.UNIMPLEMENTED,
+ }
+)
+
+_tracer = None
+_enabled = False
+
+_KIND_MAP = {
+ "client": trace.SpanKind.CLIENT,
+ "internal": trace.SpanKind.INTERNAL,
+}
+
+
+def _otel_metadata_hook():
+ """Injects W3C Trace Context (traceparent/tracestate) into gRPC metadata."""
+ headers = {}
+ inject(headers)
+ return list(headers.items())
+
+
+def _set_error_on_span(span, exception):
+ if isinstance(exception, issues.Error) and exception.status is not None:
+ span.set_attribute("db.response.status_code", exception.status.name)
+ error_type = "transport_error" if exception.status in _TRANSPORT_STATUSES else "ydb_error"
+ else:
+ error_type = type(exception).__qualname__
+
+ span.set_attribute("error.type", error_type)
+ span.set_status(StatusCode.ERROR, str(exception))
+ span.record_exception(exception)
+
+
+class TracingSpan:
+ """Wrapper around an OTel span that manages context lifecycle.
+
+ Can be used as a context manager or manually
+ """
+
+ def __init__(self, span, token):
+ self._span = span
+ self._token = token
+
+ def set_error(self, exception):
+ _set_error_on_span(self._span, exception)
+
+ def end(self):
+ self._span.end()
+ if self._token is not None:
+ context.detach(self._token)
+ self._token = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_val is not None:
+ self.set_error(exc_val)
+ self.end()
+ return False
+
+
+def _create_span(name, attributes=None, kind=None):
+ # Can be used as a context manager or manually
+ span = _tracer.start_span(
+ name,
+ kind=_KIND_MAP.get(kind, trace.SpanKind.CLIENT),
+ attributes=attributes or {},
+ )
+ ctx = trace.set_span_in_context(span)
+ token = context.attach(ctx)
+ return TracingSpan(span, token)
+
+
+def _enable_tracing(tracer=None):
+ global _enabled, _tracer
+
+ if _enabled:
+ return
+
+ _tracer = tracer if tracer is not None else trace.get_tracer("ydb.sdk")
+ _enabled = True
+ _registry.set_metadata_hook(_otel_metadata_hook)
+ _registry.set_create_span(_create_span)
diff --git a/ydb/opentelemetry/tracing.py b/ydb/opentelemetry/tracing.py
new file mode 100644
index 00000000..07a0ead7
--- /dev/null
+++ b/ydb/opentelemetry/tracing.py
@@ -0,0 +1,80 @@
+class _NoopSpan:
+ """Returned by create_ydb_span when tracing is disabled."""
+
+ def set_error(self, exception):
+ pass
+
+ def end(self):
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
+
+
+_NOOP_SPAN = _NoopSpan()
+
+
+class OtelTracingRegistry:
+ """Singleton registry for OpenTelemetry tracing.
+
+ By default everything is no-op until :func:`enable_tracing` is called.
+ """
+
+ def __init__(self):
+ self._metadata_hook = None
+ self._create_span_func = None
+
+ def create_span(self, name, attributes=None, kind=None):
+ """Create a span. Returns a TracingSpan or _NoopSpan."""
+ if self._create_span_func is None:
+ return _NOOP_SPAN
+ return self._create_span_func(name, attributes, kind=kind)
+
+ def get_trace_metadata(self):
+ """Return tracing metadata (e.g. W3C traceparent) for gRPC calls."""
+ if self._metadata_hook is not None:
+ return self._metadata_hook()
+ return []
+
+ def set_metadata_hook(self, hook):
+ self._metadata_hook = hook
+
+ def set_create_span(self, func):
+ self._create_span_func = func
+
+
+_registry = OtelTracingRegistry()
+
+
+def get_trace_metadata():
+ """Return tracing metadata for gRPC calls."""
+ return _registry.get_trace_metadata()
+
+
+def _build_ydb_attrs(driver_config, session_id=None, node_id=None, tx_id=None):
+ endpoint = getattr(driver_config, "endpoint", None) or ""
+ host, _, port = endpoint.rpartition(":")
+ attrs = {
+ "db.system.name": "ydb",
+ "db.namespace": getattr(driver_config, "database", None) or "",
+ "server.address": host,
+ "server.port": int(port) if port.isdigit() else 0,
+ }
+ if session_id is not None:
+ attrs["ydb.session.id"] = session_id or ""
+ if node_id is not None:
+ attrs["ydb.node.id"] = node_id or 0
+ if tx_id is not None:
+ attrs["ydb.tx.id"] = tx_id or ""
+ return attrs
+
+
+def create_ydb_span(name, driver_config, session_id=None, node_id=None, tx_id=None, kind=None):
+ """Create a span pre-filled with standard YDB attributes.
+ Can be used as a context manager or manually.
+ """
+ attrs = _build_ydb_attrs(driver_config, session_id, node_id, tx_id)
+ return _registry.create_span(name, attributes=attrs, kind=kind)
diff --git a/ydb/pool.py b/ydb/pool.py
index 2901c573..4fef6377 100644
--- a/ydb/pool.py
+++ b/ydb/pool.py
@@ -10,6 +10,7 @@
from typing import Any, Callable, ContextManager, List, Optional, Set, Tuple, TYPE_CHECKING
from . import connection as connection_impl, issues, resolver, _utilities, tracing
+from .opentelemetry.tracing import create_ydb_span
from abc import abstractmethod
from .connection import Connection, EndpointKey
@@ -453,10 +454,11 @@ def wait(self, timeout: Optional[float] = None, fail_fast: bool = False) -> None
:param timeout: A timeout to wait in seconds
:return: None
"""
- if fail_fast:
- self._store.add_fast_fail().result(timeout)
- else:
- self._store.subscribe().result(timeout)
+ with create_ydb_span("ydb.Driver.Initialize", self._driver_config, kind="internal"):
+ if fail_fast:
+ self._store.add_fast_fail().result(timeout)
+ else:
+ self._store.subscribe().result(timeout)
def _on_disconnected(self, connection: Connection) -> None:
"""
diff --git a/ydb/query/base.py b/ydb/query/base.py
index e7764e1c..1aeb4f6b 100644
--- a/ydb/query/base.py
+++ b/ydb/query/base.py
@@ -27,7 +27,6 @@
from ydb._topic_common.common import CallFromSyncToAsync, _get_shared_event_loop
from ydb._grpc.grpcwrapper.common_utils import to_thread
-
if typing.TYPE_CHECKING:
from .transaction import BaseQueryTxContext
from .session import BaseQuerySession
@@ -73,9 +72,10 @@ class QueryResultSetFormat(enum.IntEnum):
class SyncResponseContextIterator(_utilities.SyncResponseIterator):
- def __init__(self, it, wrapper, on_error=None):
+ def __init__(self, it, wrapper, on_error=None, span=None):
super().__init__(it, wrapper)
self._on_error = on_error
+ self._span = span
def __enter__(self) -> "SyncResponseContextIterator":
return self
@@ -83,15 +83,30 @@ def __enter__(self) -> "SyncResponseContextIterator":
def _next(self):
try:
return super()._next()
+ except StopIteration:
+ self._finish_span()
+ raise
except Exception as e:
if self._on_error:
self._on_error(e)
+ self._finish_span(e)
raise e
+ def _finish_span(self, exception=None):
+ if self._span is not None:
+ if exception is not None:
+ self._span.set_error(exception)
+ self._span.end()
+ self._span = None
+
+ def __del__(self):
+ self._finish_span()
+
def __exit__(self, exc_type, exc_val, exc_tb):
# To close stream on YDB it is necessary to scroll through it to the end
for _ in self:
pass
+ self._finish_span()
class QueryClientSettings:
diff --git a/ydb/query/session.py b/ydb/query/session.py
index b21c6ba4..3b546f76 100644
--- a/ydb/query/session.py
+++ b/ydb/query/session.py
@@ -18,6 +18,7 @@
from .base import QueryExplainResultFormat
from .. import _apis, issues, _utilities
+from ..opentelemetry.tracing import create_ydb_span
from ..settings import BaseRequestSettings
from ..connection import _RpcState as RpcState, EndpointKey
from .._grpc.grpcwrapper import common_utils
@@ -30,7 +31,7 @@
from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT, DEFAULT_LONG_STREAM_TIMEOUT
if TYPE_CHECKING:
- from ..driver import Driver as SyncDriver
+ from ..driver import Driver as SyncDriver, DriverConfig
from ..aio.driver import Driver as AsyncDriver
@@ -84,6 +85,10 @@ def __init__(self, driver: DriverT, settings: Optional[base.QueryClientSettings]
self._last_query_stats = None
+ @property
+ def _driver_config(self) -> Optional["DriverConfig"]:
+ return getattr(self._driver, "_driver_config", None)
+
@property
def session_id(self) -> Optional[str]:
return self._session_id
@@ -368,8 +373,9 @@ def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySessio
if self._closed:
raise RuntimeError("Session is already closed.")
- self._create_call(settings=settings)
- self._attach()
+ with create_ydb_span("ydb.CreateSession", self._driver_config):
+ self._create_call(settings=settings)
+ self._attach()
return self
@@ -435,30 +441,41 @@ def execute(
"""
self._check_session_ready_to_use()
- stream_it = self._execute_call(
- query=query,
- parameters=parameters,
- commit_tx=True,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ "ydb.ExecuteQuery", self._driver_config, session_id=self._session_id, node_id=self._node_id
)
- return base.SyncResponseContextIterator(
- stream_it,
- lambda resp: base.wrap_execute_query_response(
- rpc_state=None,
- response_pb=resp,
- session=self,
- settings=self._settings,
- ),
- on_error=self._on_execute_stream_error,
- )
+ try:
+ stream_it = self._execute_call(
+ query=query,
+ parameters=parameters,
+ commit_tx=True,
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
+
+ return base.SyncResponseContextIterator(
+ stream_it,
+ lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ session=self,
+ settings=self._settings,
+ ),
+ on_error=self._on_execute_stream_error,
+ span=span,
+ )
+ except Exception as e:
+ if span is not None:
+ span.set_error(e)
+ span.end()
+ raise
def explain(
self,
diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py
index 687a5eaf..f96b7788 100644
--- a/ydb/query/transaction.py
+++ b/ydb/query/transaction.py
@@ -17,6 +17,7 @@
_apis,
issues,
)
+from ..opentelemetry.tracing import create_ydb_span
from .._grpc.grpcwrapper import ydb_topic as _ydb_topic
from .._grpc.grpcwrapper import ydb_query as _ydb_query
from ..connection import _RpcState as RpcState
@@ -244,6 +245,10 @@ def __init__(self, driver: DriverT, session: "BaseQuerySession", tx_mode: base.B
self._external_error = None
self._last_query_stats = None
+ @property
+ def _driver_config(self):
+ return getattr(self._driver, "_driver_config", None)
+
@property
def session_id(self) -> Optional[str]:
"""
@@ -553,13 +558,20 @@ def commit(self, settings: Optional[BaseRequestSettings] = None) -> None:
self._ensure_prev_stream_finished()
- try:
- self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT)
- self._commit_call(settings)
- self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None)
- except BaseException as e: # TODO: probably should be less wide
- self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e)
- raise e
+ with create_ydb_span(
+ "ydb.Commit",
+ self._driver_config,
+ session_id=self.session.session_id,
+ node_id=self.session.node_id,
+ tx_id=self._tx_state.tx_id,
+ ):
+ try:
+ self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT)
+ self._commit_call(settings)
+ self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None)
+ except BaseException as e: # TODO: probably should be less wide
+ self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e)
+ raise e
def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None:
"""Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution
@@ -579,13 +591,20 @@ def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None:
self._ensure_prev_stream_finished()
- try:
- self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK)
- self._rollback_call(settings)
- self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None)
- except BaseException as e: # TODO: probably should be less wide
- self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e)
- raise e
+ with create_ydb_span(
+ "ydb.Rollback",
+ self._driver_config,
+ session_id=self.session.session_id,
+ node_id=self.session.node_id,
+ tx_id=self._tx_state.tx_id,
+ ):
+ try:
+ self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK)
+ self._rollback_call(settings)
+ self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None)
+ except BaseException as e: # TODO: probably should be less wide
+ self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e)
+ raise e
def execute(
self,
@@ -634,30 +653,45 @@ def execute(
"""
self._ensure_prev_stream_finished()
- stream_it = self._execute_call(
- query=query,
- commit_tx=commit_tx,
- syntax=syntax,
- exec_mode=exec_mode,
- stats_mode=stats_mode,
- schema_inclusion_mode=schema_inclusion_mode,
- result_set_format=result_set_format,
- arrow_format_settings=arrow_format_settings,
- parameters=parameters,
- concurrent_result_sets=concurrent_result_sets,
- settings=settings,
+ span = create_ydb_span(
+ "ydb.ExecuteQuery",
+ self._driver_config,
+ session_id=self.session.session_id,
+ node_id=self.session.node_id,
+ tx_id=self._tx_state.tx_id,
)
- self._prev_stream = base.SyncResponseContextIterator(
- stream_it,
- lambda resp: base.wrap_execute_query_response(
- rpc_state=None,
- response_pb=resp,
- session=self.session,
- tx=self,
+ try:
+ stream_it = self._execute_call(
+ query=query,
commit_tx=commit_tx,
- settings=self.session._settings,
- ),
- on_error=self.session._on_execute_stream_error,
- )
- return self._prev_stream
+ syntax=syntax,
+ exec_mode=exec_mode,
+ stats_mode=stats_mode,
+ schema_inclusion_mode=schema_inclusion_mode,
+ result_set_format=result_set_format,
+ arrow_format_settings=arrow_format_settings,
+ parameters=parameters,
+ concurrent_result_sets=concurrent_result_sets,
+ settings=settings,
+ )
+
+ self._prev_stream = base.SyncResponseContextIterator(
+ stream_it,
+ lambda resp: base.wrap_execute_query_response(
+ rpc_state=None,
+ response_pb=resp,
+ session=self.session,
+ tx=self,
+ commit_tx=commit_tx,
+ settings=self.session._settings,
+ ),
+ on_error=self.session._on_execute_stream_error,
+ span=span,
+ )
+ return self._prev_stream
+ except Exception as e:
+ if span is not None:
+ span.set_error(e)
+ span.end()
+ raise