Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions langfuse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
LangfuseTool,
)
from ._version import __version__
from .media import LangfuseMedia, LangfuseMediaReference
from .span_filter import (
KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES,
is_default_export_span,
Expand All @@ -49,6 +50,8 @@

__all__ = [
"Langfuse",
"LangfuseMedia",
"LangfuseMediaReference",
"get_client",
"observe",
"propagate_attributes",
Expand Down
196 changes: 189 additions & 7 deletions langfuse/_client/client.py

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual (non generated changes)

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import re
import urllib.parse
import uuid
import warnings
from datetime import datetime
from hashlib import sha256
Expand All @@ -19,6 +20,7 @@
List,
Literal,
Optional,
Tuple,
Type,
Union,
cast,
Expand All @@ -27,6 +29,7 @@

import backoff
import httpx
from jsonpath_ng.ext import parse as parse_jsonpath # type: ignore[import-untyped]
from opentelemetry import context as otel_context_api
from opentelemetry import trace as otel_trace_api
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
Expand Down Expand Up @@ -94,6 +97,7 @@
CreateTextPromptRequest,
Dataset,
DatasetItem,
DatasetItemMediaReferenceField,
DatasetRunWithItems,
DatasetStatus,
DeleteDatasetRunResponse,
Expand Down Expand Up @@ -126,7 +130,7 @@
_run_task,
)
from langfuse.logger import langfuse_logger
from langfuse.media import LangfuseMedia
from langfuse.media import LangfuseMedia, LangfuseMediaReference
from langfuse.model import (
ChatMessageDict,
ChatMessageWithPlaceholdersDict,
Expand Down Expand Up @@ -2322,15 +2326,17 @@ def get_dataset(
*,
fetch_items_page_size: Optional[int] = 50,
version: Optional[datetime] = None,
resolve_media_references: bool = False,
) -> "DatasetClient":
"""Fetch a dataset by its name.

Args:
name (str): The name of the dataset to fetch.
fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
version (Optional[datetime]): Retrieve dataset items as they existed at this specific point in time (UTC).
name: The name of the dataset to fetch.
fetch_items_page_size: All items of the dataset will be fetched in chunks of this size. Defaults to 50.
version: Retrieve dataset items as they existed at this specific point in time (UTC).
If provided, returns the state of items at the specified UTC timestamp.
If not provided, returns the latest version. Must be a timezone-aware datetime object in UTC.
resolve_media_references: If true, resolve media reference strings in dataset items to LangfuseMediaReference objects.

Returns:
DatasetClient: The dataset with the given name.
Expand All @@ -2339,7 +2345,7 @@ def get_dataset(
langfuse_logger.debug(f"Getting datasets {name}")
dataset = self.api.datasets.get(dataset_name=self._url_encode(name))

dataset_items = []
dataset_items: List[DatasetItem] = []
page = 1

while True:
Expand All @@ -2348,8 +2354,16 @@ def get_dataset(
page=page,
limit=fetch_items_page_size,
version=version,
include_media_references=resolve_media_references or None,
)
dataset_items.extend(
[
self._hydrate_dataset_item_media_references(item)
for item in new_items.data
]
if resolve_media_references
else new_items.data
)
dataset_items.extend(new_items.data)

if new_items.meta.total_pages <= page:
break
Expand Down Expand Up @@ -3355,6 +3369,45 @@ def create_dataset_item(
try:
langfuse_logger.debug(f"Creating dataset item for dataset {dataset_name}")

# Media uploads must reference the (dataset, item) they belong to, and
# the item need not exist yet — so settle on the item id up front and
# reuse it for the create call below.
item_id = id if id is not None else str(uuid.uuid4())

# Single pass per field: swap each LangfuseMedia for its reference
# string (derived from content, not the upload) and collect the media
# still to upload, deduped by media id and tagged with its field.
pending_media: Dict[str, Tuple[LangfuseMedia, str]] = {}
input = self._process_dataset_item_media(
data=input,
pending_media=pending_media,
field=DatasetItemMediaReferenceField.INPUT.value,
)
expected_output = self._process_dataset_item_media(
data=expected_output,
pending_media=pending_media,
field=DatasetItemMediaReferenceField.EXPECTED_OUTPUT.value,
)
metadata = self._process_dataset_item_media(
data=metadata,
pending_media=pending_media,
field=DatasetItemMediaReferenceField.METADATA.value,
)

# The upload needs the dataset id, but the create API only takes the
# name. Resolve it once, and only when there is actually media to
# upload — a plain item pays no extra datasets.get round-trip.
if pending_media:
assert self._resources is not None
dataset_id = self.api.datasets.get(self._url_encode(dataset_name)).id
for media, field in pending_media.values():
self._resources._media_manager._upload_media_sync(
media=media,
dataset_id=dataset_id,
dataset_item_id=item_id,
field=field,
)

result = self.api.dataset_items.create(
dataset_name=dataset_name,
input=input,
Expand All @@ -3363,14 +3416,143 @@ def create_dataset_item(
source_trace_id=source_trace_id,
source_observation_id=source_observation_id,
status=status,
id=id,
id=item_id,
)

return cast(DatasetItem, result)
except Error as e:
handle_fern_exception(e)
raise e

def _process_dataset_item_media(
self,
*,
data: Any,
pending_media: Dict[str, Tuple[LangfuseMedia, str]],
field: str,
) -> Any:
"""Swap each ``LangfuseMedia`` for its reference string in ``data``.

Each replaced media is recorded in ``pending_media`` (keyed by media id,
so the same media across fields uploads once) for the caller to upload
after the dataset id has been resolved.
"""
if self._resources is None:
return data

max_levels = 10

def _process_data_recursively(
data: Any, level: int, ancestor_container_ids: set[int]
) -> Any:
# Avoid jsonpath-ng here: dataset writes should keep working
# under python -OO where parser docstrings may be stripped.
Comment on lines +3448 to +3449

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my opinion an okay compromise as the write path doesn't become a hard blocker for users and easy to maintain

Avoid hand rolling jsonpath parsing for the read path -> users can also opt out here so in my opinion fine

if isinstance(data, LangfuseMedia):
reference_string = data._reference_string
media_id = data._media_id
if reference_string is None or media_id is None:
raise ValueError(
"Cannot create dataset item with invalid LangfuseMedia."
)
# First field a media appears in wins; later duplicates dedupe.
pending_media.setdefault(media_id, (data, field))
return reference_string

if isinstance(data, LangfuseMediaReference):
return data.reference_string if data.reference_string else data

# Tuples are intentionally excluded: namedtuple subclasses can't be
# rebuilt from an iterable, so media inside them is left untouched.
if not isinstance(data, (list, set, frozenset, dict)):
return data

# Container ids only protect against recursive cycles.
data_id = id(data)
if data_id in ancestor_container_ids or level > max_levels:
return data

next_ancestor_container_ids = ancestor_container_ids | {data_id}

if isinstance(data, (list, set, frozenset)):
processed = (
_process_data_recursively(
item, level + 1, next_ancestor_container_ids
)
for item in data
)
return type(data)(processed)

return {
key: _process_data_recursively(
value, level + 1, next_ancestor_container_ids
Comment thread
wochinge marked this conversation as resolved.
)
for key, value in data.items()
}

return _process_data_recursively(data, 1, set())

def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetItem:
media_references = item.media_references or []
if not media_references:
return item

# Map the API enum member to the snake_case model attribute so this keeps
# working regardless of the enum's wire value (e.g. "expectedOutput").
attr_by_field = {
DatasetItemMediaReferenceField.INPUT: "input",
DatasetItemMediaReferenceField.EXPECTED_OUTPUT: "expected_output",
DatasetItemMediaReferenceField.METADATA: "metadata",
}
hydrated_fields = {
"input": item.input,
"expected_output": item.expected_output,
"metadata": item.metadata,
}

for media_reference in media_references:
media = media_reference.media
if media is None:
continue

field = attr_by_field.get(media_reference.field)
if field is None:
continue

replacement = LangfuseMediaReference(
media_id=media.media_id,
content_type=media.content_type,
url=media.url,
url_expiry=media.url_expiry,
content_length=media.content_length,
reference_string=media_reference.reference_string,
)
Comment thread
claude[bot] marked this conversation as resolved.
hydrated_fields[field] = self._replace_json_path_value(
value=hydrated_fields[field],
json_path=media_reference.json_path,
replacement=replacement,
)

return item.model_copy(
update={
"input": hydrated_fields["input"],
"expected_output": hydrated_fields["expected_output"],
"metadata": hydrated_fields["metadata"],
}
)

def _replace_json_path_value(
self, *, value: Any, json_path: str, replacement: LangfuseMediaReference
) -> Any:
try:
value = parse_jsonpath(json_path).update(value, replacement)
except Exception as e:
langfuse_logger.warning(
f"Failed to hydrate dataset media reference at JSONPath {json_path}",
exc_info=e,
)

return value
Comment thread
wochinge marked this conversation as resolved.

def resolve_media_references(
self,
*,
Comment thread
claude[bot] marked this conversation as resolved.
Expand Down
23 changes: 23 additions & 0 deletions langfuse/_client/resource_manager.py

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual (non generated changes)

Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,29 @@ class LangfuseResourceManager:
_instances: Dict[str, "LangfuseResourceManager"] = {}
_lock = threading.RLock()

@classmethod
def get_singleton_httpx_client(cls) -> Optional[httpx.Client]:
with cls._lock:
instances = list(cls._instances.values())

if not instances:
return None

if len(instances) > 1:
# Mirror get_client's safety stance: with multiple clients we
# cannot tell which one produced a given reference, so fall back
# to a default httpx client rather than silently using an
# arbitrary instance's transport config (proxy / CA / mTLS).
langfuse_logger.warning(
"Multiple Langfuse clients are instantiated; falling back to a "
"default httpx client for LangfuseMediaReference fetches. Pass an "
"explicit `client` to fetch_bytes/fetch_base64/fetch_data_uri to "
"honor per-client transport settings."
)
return None

return instances[0].httpx_client
Comment thread
claude[bot] marked this conversation as resolved.

def __new__(
cls,
*,
Expand Down
43 changes: 42 additions & 1 deletion langfuse/_task_manager/media_manager.py

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual (non generated changes)

Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ def _process_media(
content_sha256_hash=media._content_sha256_hash,
trace_id=trace_id,
observation_id=observation_id,
dataset_id=None,
dataset_item_id=None,
field=field,
)

Expand All @@ -284,6 +286,43 @@ def _process_media(
f"Media processing error: Failed to process media_id={media._media_id} for trace_id={trace_id}. Error: {str(e)}"
)

def _upload_media_sync(
self,
*,
media: LangfuseMedia,
dataset_id: Optional[str] = None,
dataset_item_id: Optional[str] = None,
field: Optional[str] = None,
) -> None:
if not self._enabled:
raise ValueError("Cannot upload LangfuseMedia while media upload is disabled.")

if (
media._content_length is None
or media._content_type is None
or media._content_sha256_hash is None
or media._content_bytes is None
):
raise ValueError("Cannot upload invalid LangfuseMedia.")

if media._media_id is None:
raise ValueError("Cannot upload LangfuseMedia without media ID.")

upload_media_job = UploadMediaJob(
media_id=media._media_id,
content_bytes=media._content_bytes,
content_type=media._content_type,
content_length=media._content_length,
content_sha256_hash=media._content_sha256_hash,
trace_id=None,
observation_id=None,
dataset_id=dataset_id,
dataset_item_id=dataset_item_id,
field=field,
)

self._process_upload_media_job(data=upload_media_job)

def _process_upload_media_job(
self,
*,
Expand All @@ -294,9 +333,11 @@ def _process_upload_media_job(
content_length=data["content_length"],
content_type=cast(MediaContentType, data["content_type"]),
sha256hash=data["content_sha256_hash"],
field=data["field"],
trace_id=data["trace_id"],
observation_id=data["observation_id"],
dataset_id=data["dataset_id"],
dataset_item_id=data["dataset_item_id"],
field=data["field"],
)

upload_url = upload_url_response.upload_url
Expand Down
Loading
Loading