Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/twinkle_client/http/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Callable, Dict, Optional, Set

from .http_utils import http_post
from .utils import TWINKLE_SERVER_URL
from .utils import get_base_url


class HeartbeatManager:
Expand Down Expand Up @@ -33,7 +33,6 @@ def __init__(self):
return

self._initialized = True
self.server_url = TWINKLE_SERVER_URL

# Processor heartbeat management
self.processor_ids: Set[str] = set()
Expand All @@ -52,7 +51,7 @@ def __init__(self):

def processor_heartbeat_func(self, processor_id_list: str):
response = http_post(
url=f'{self.server_url}/processors/heartbeat', json_data={'processor_id': processor_id_list})
url=f'{get_base_url()}/processors/heartbeat', json_data={'processor_id': processor_id_list})
response.raise_for_status()

def register_processor(self, processor_id: str):
Expand Down
19 changes: 14 additions & 5 deletions src/twinkle_client/http/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,33 @@
_base_url_context: ContextVar[Optional[str]] = ContextVar('base_url', default=None)
_api_key_context: ContextVar[Optional[str]] = ContextVar('api_key', default=None)

# Global fallback for base_url, accessible from all threads (including heartbeat daemon threads).
# ContextVar is thread-local and invisible to background threads, so we also store the URL here.
_global_base_url: Optional[str] = None

# Global static request ID shared across all threads
# This ensures heartbeat threads use the same request ID as the main training thread
_global_request_id: Optional[str] = None


def set_base_url(url: str):
"""Set the base URL for HTTP requests in the current context."""
_base_url_context.set(url.rstrip('/'))
"""Set the base URL for HTTP requests in the current context and globally."""
global _global_base_url
stripped = url.rstrip('/')
_base_url_context.set(stripped)
_global_base_url = stripped


def get_base_url() -> Optional[str]:
"""Get the current base URL from context or environment variable."""
return _base_url_context.get() or TWINKLE_SERVER_URL
"""Get the current base URL from context, global fallback, or environment variable."""
return _base_url_context.get() or _global_base_url or TWINKLE_SERVER_URL


def clear_base_url():
"""Clear the base URL context, falling back to environment variable."""
"""Clear the base URL context and global fallback, falling back to environment variable."""
global _global_base_url
_base_url_context.set(None)
_global_base_url = None


def set_api_key(api_key: str):
Expand Down
Loading