Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import app_state
from utils import ensure_https, get_gateway_host
from pat_rotator import PATRotator
from telemetry import log_telemetry, set_product_info

# Sanitize DATABRICKS_TOKEN early — the platform sometimes injects trailing
# newlines / whitespace which causes auth failures. Cleaning it here prevents
Expand Down Expand Up @@ -175,6 +176,7 @@ def _setup_git_config():
db_token = os.environ.get("DATABRICKS_TOKEN")
if db_host and db_token:
w = WorkspaceClient(host=db_host, token=db_token, auth_type="pat")
set_product_info(w)
me = w.current_user.me()
user_email = me.user_name
display_name = me.display_name or user_email.split("@")[0]
Expand Down Expand Up @@ -412,6 +414,7 @@ def get_token_owner():
if app_name:
try:
w = WorkspaceClient() # auto-detects SP credentials
set_product_info(w)
app = w.apps.get(name=app_name)
owner = (app.creator or "").lower()
logger.info(f"Owner resolved from app.creator: {owner}")
Expand All @@ -426,6 +429,7 @@ def get_token_owner():
if not host or not token:
return None
w = WorkspaceClient(host=host, token=token, auth_type="pat")
set_product_info(w)
username = w.current_user.me().user_name
return username.lower() if username else username
except Exception as e:
Expand Down Expand Up @@ -1060,6 +1064,9 @@ def create_session():
thread = threading.Thread(target=read_pty_output, args=(session_id, master_fd), daemon=True)
thread.start()

# Telemetry: track session creation with agent type
log_telemetry("agent", label or "shell")

return jsonify({"session_id": session_id})
except Exception as e:
return jsonify({"error": str(e)}), 500
Expand Down Expand Up @@ -1112,6 +1119,10 @@ def upload_file():

file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
logger.info(f"Upload saved: {file_path} ({file_size} bytes)")

# Telemetry: track file uploads
log_telemetry("event", "file_upload")

return jsonify({"path": file_path})


Expand Down Expand Up @@ -1269,6 +1280,9 @@ def initialize_app(local_dev=False):
os.environ.pop("DATABRICKS_CLIENT_SECRET", None)
logger.info("SP credentials stripped — PAT-only auth from this point")

# Telemetry: app startup ping (fire-and-forget in background thread)
log_telemetry("event", "app_startup")

# Start background cleanup thread
cleanup_thread = threading.Thread(target=cleanup_stale_sessions, daemon=True)
cleanup_thread.start()
Expand Down
7 changes: 7 additions & 0 deletions pat_rotator.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ def _rotate_once(self):
logger.info(f"INFO: PAT rotation complete — new token (id={new_token_id}, "
f"expires in {self._token_lifetime}s). First rotation — no old token to revoke.")

# Telemetry: track PAT rotation events (import here to avoid circular deps)
try:
from telemetry import log_telemetry
log_telemetry("event", "pat_rotation")
except Exception:
pass # Telemetry must never break rotation

return True

def revoke_bootstrap_token(self):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "coda"
version = "0.17.2"
version = "0.17.3"
description = "CoDA - Coding Agents on Databricks Apps"
requires-python = ">=3.10"
dependencies = [
Expand Down
11 changes: 11 additions & 0 deletions sync_to_workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def get_user_email():
if not host or not token:
raise RuntimeError("~/.databrickscfg missing host or token")
w = WorkspaceClient(host=host, token=token, auth_type="pat")
try:
from telemetry import set_product_info
set_product_info(w)
except Exception:
pass
return w.current_user.me().user_name


Expand Down Expand Up @@ -68,6 +73,12 @@ def sync_project(project_path: Path):

if result.returncode == 0:
print(f"✓ Synced to {workspace_dest}")
# Telemetry: track workspace sync events
try:
from telemetry import log_telemetry
log_telemetry("event", "workspace_sync")
except Exception:
pass # Telemetry must never break sync
else:
print(f"⚠ Sync warning: {result.stderr}", file=sys.stderr)

Expand Down
98 changes: 98 additions & 0 deletions telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Databricks Labs telemetry for CoDA.

Follows the DQX pattern: piggybacks telemetry on the Databricks SDK's
User-Agent header. Each log_telemetry() call creates a throwaway
WorkspaceClient, augments the User-Agent with key-value data, and fires
clusters.select_spark_version() to transmit the header to Databricks
servers where it's recorded.

All telemetry runs in background daemon threads -- never blocks the
Flask request path or terminal I/O.

Reference: https://github.com/databrickslabs/dqx/blob/main/src/databricks/labs/dqx/telemetry.py
"""

import functools
import logging
import os
import threading

import tomllib

logger = logging.getLogger(__name__)

_version_cache = None


def _get_version():
"""Get CoDA version from pyproject.toml (cached after first call)."""
global _version_cache
if _version_cache is not None:
return _version_cache
try:
pyproject = os.path.join(os.path.dirname(__file__), "pyproject.toml")
with open(pyproject, "rb") as f:
_version_cache = tomllib.load(f)["project"]["version"]
except Exception:
_version_cache = "0.0.0"
return _version_cache


def set_product_info(ws):
"""Set CoDA product info on a WorkspaceClient for telemetry attribution.

Call this on any WorkspaceClient so all SDK API calls carry the 'coda'
product identifier in the User-Agent header.
"""
product_info = getattr(ws.config, "_product_info", None)
if product_info is None or product_info[0] != "coda":
setattr(ws.config, "_product_info", ("coda", _get_version()))


def log_telemetry(key, value):
"""Send a telemetry key-value pair via the Databricks SDK User-Agent header.

Creates a throwaway WorkspaceClient from ~/.databrickscfg, adds the
key-value to the User-Agent, and fires clusters.select_spark_version()
to transmit. Runs in a background daemon thread. Errors are caught and
logged, never raised.
"""

def _send():
try:
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError

ws = WorkspaceClient()
set_product_info(ws)
new_config = ws.config.copy().with_user_agent_extra(key, value)
temp_ws = WorkspaceClient(config=new_config)
try:
temp_ws.clusters.select_spark_version()
except DatabricksError as e:
logger.debug(f"Telemetry transmit failed: {e}")
except Exception as e:
logger.debug(f"Telemetry error ({key}={value}): {e}")

threading.Thread(target=_send, daemon=True, name=f"telemetry-{key}").start()


def telemetry_logger(key, value):
"""Decorator that fires telemetry before executing the wrapped function.

Works on standalone functions and class methods alike. Creates its own
WorkspaceClient from ~/.databrickscfg -- no self.ws required.
"""

def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
log_telemetry(key, value)
except Exception:
pass # Telemetry must never break the wrapped function
return func(*args, **kwargs)

return wrapper

return decorator
Loading
Loading