-
Notifications
You must be signed in to change notification settings - Fork 298
feat(datasets): support multimodal dataset items #1710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ed19fe4
cd791b6
a9a4a96
2c6718a
592f9de
3832a4e
88905ce
d7eb3ed
7db0fe7
a2cba12
0e2ea09
310f692
dcb3fc7
6c0d12c
d4e08f2
e40e8ea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |
| import os | ||
| import re | ||
| import urllib.parse | ||
| import uuid | ||
| import warnings | ||
| from datetime import datetime | ||
| from hashlib import sha256 | ||
|
|
@@ -19,6 +20,7 @@ | |
| List, | ||
| Literal, | ||
| Optional, | ||
| Tuple, | ||
| Type, | ||
| Union, | ||
| cast, | ||
|
|
@@ -27,6 +29,7 @@ | |
|
|
||
| import backoff | ||
| import httpx | ||
| from jsonpath_ng.ext import parse as parse_jsonpath # type: ignore[import-untyped] | ||
| from opentelemetry import context as otel_context_api | ||
| from opentelemetry import trace as otel_trace_api | ||
| from opentelemetry.sdk.trace import ReadableSpan, TracerProvider | ||
|
|
@@ -94,6 +97,7 @@ | |
| CreateTextPromptRequest, | ||
| Dataset, | ||
| DatasetItem, | ||
| DatasetItemMediaReferenceField, | ||
| DatasetRunWithItems, | ||
| DatasetStatus, | ||
| DeleteDatasetRunResponse, | ||
|
|
@@ -126,7 +130,7 @@ | |
| _run_task, | ||
| ) | ||
| from langfuse.logger import langfuse_logger | ||
| from langfuse.media import LangfuseMedia | ||
| from langfuse.media import LangfuseMedia, LangfuseMediaReference | ||
| from langfuse.model import ( | ||
| ChatMessageDict, | ||
| ChatMessageWithPlaceholdersDict, | ||
|
|
@@ -2322,15 +2326,17 @@ def get_dataset( | |
| *, | ||
| fetch_items_page_size: Optional[int] = 50, | ||
| version: Optional[datetime] = None, | ||
| resolve_media_references: bool = False, | ||
| ) -> "DatasetClient": | ||
| """Fetch a dataset by its name. | ||
|
|
||
| Args: | ||
| name (str): The name of the dataset to fetch. | ||
| fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. | ||
| version (Optional[datetime]): Retrieve dataset items as they existed at this specific point in time (UTC). | ||
| name: The name of the dataset to fetch. | ||
| fetch_items_page_size: All items of the dataset will be fetched in chunks of this size. Defaults to 50. | ||
| version: Retrieve dataset items as they existed at this specific point in time (UTC). | ||
| If provided, returns the state of items at the specified UTC timestamp. | ||
| If not provided, returns the latest version. Must be a timezone-aware datetime object in UTC. | ||
| resolve_media_references: If true, resolve media reference strings in dataset items to LangfuseMediaReference objects. | ||
|
|
||
| Returns: | ||
| DatasetClient: The dataset with the given name. | ||
|
|
@@ -2339,7 +2345,7 @@ def get_dataset( | |
| langfuse_logger.debug(f"Getting datasets {name}") | ||
| dataset = self.api.datasets.get(dataset_name=self._url_encode(name)) | ||
|
|
||
| dataset_items = [] | ||
| dataset_items: List[DatasetItem] = [] | ||
| page = 1 | ||
|
|
||
| while True: | ||
|
|
@@ -2348,8 +2354,16 @@ def get_dataset( | |
| page=page, | ||
| limit=fetch_items_page_size, | ||
| version=version, | ||
| include_media_references=resolve_media_references or None, | ||
| ) | ||
| dataset_items.extend( | ||
| [ | ||
| self._hydrate_dataset_item_media_references(item) | ||
| for item in new_items.data | ||
| ] | ||
| if resolve_media_references | ||
| else new_items.data | ||
| ) | ||
| dataset_items.extend(new_items.data) | ||
|
|
||
| if new_items.meta.total_pages <= page: | ||
| break | ||
|
|
@@ -3355,6 +3369,45 @@ def create_dataset_item( | |
| try: | ||
| langfuse_logger.debug(f"Creating dataset item for dataset {dataset_name}") | ||
|
|
||
| # Media uploads must reference the (dataset, item) they belong to, and | ||
| # the item need not exist yet — so settle on the item id up front and | ||
| # reuse it for the create call below. | ||
| item_id = id if id is not None else str(uuid.uuid4()) | ||
|
|
||
| # Single pass per field: swap each LangfuseMedia for its reference | ||
| # string (derived from content, not the upload) and collect the media | ||
| # still to upload, deduped by media id and tagged with its field. | ||
| pending_media: Dict[str, Tuple[LangfuseMedia, str]] = {} | ||
| input = self._process_dataset_item_media( | ||
| data=input, | ||
| pending_media=pending_media, | ||
| field=DatasetItemMediaReferenceField.INPUT.value, | ||
| ) | ||
| expected_output = self._process_dataset_item_media( | ||
| data=expected_output, | ||
| pending_media=pending_media, | ||
| field=DatasetItemMediaReferenceField.EXPECTED_OUTPUT.value, | ||
| ) | ||
| metadata = self._process_dataset_item_media( | ||
| data=metadata, | ||
| pending_media=pending_media, | ||
| field=DatasetItemMediaReferenceField.METADATA.value, | ||
| ) | ||
|
|
||
| # The upload needs the dataset id, but the create API only takes the | ||
| # name. Resolve it once, and only when there is actually media to | ||
| # upload — a plain item pays no extra datasets.get round-trip. | ||
| if pending_media: | ||
| assert self._resources is not None | ||
| dataset_id = self.api.datasets.get(self._url_encode(dataset_name)).id | ||
| for media, field in pending_media.values(): | ||
| self._resources._media_manager._upload_media_sync( | ||
| media=media, | ||
| dataset_id=dataset_id, | ||
| dataset_item_id=item_id, | ||
| field=field, | ||
| ) | ||
|
|
||
| result = self.api.dataset_items.create( | ||
| dataset_name=dataset_name, | ||
| input=input, | ||
|
|
@@ -3363,14 +3416,143 @@ def create_dataset_item( | |
| source_trace_id=source_trace_id, | ||
| source_observation_id=source_observation_id, | ||
| status=status, | ||
| id=id, | ||
| id=item_id, | ||
| ) | ||
|
|
||
| return cast(DatasetItem, result) | ||
| except Error as e: | ||
| handle_fern_exception(e) | ||
| raise e | ||
|
|
||
| def _process_dataset_item_media( | ||
| self, | ||
| *, | ||
| data: Any, | ||
| pending_media: Dict[str, Tuple[LangfuseMedia, str]], | ||
| field: str, | ||
| ) -> Any: | ||
| """Swap each ``LangfuseMedia`` for its reference string in ``data``. | ||
|
|
||
| Each replaced media is recorded in ``pending_media`` (keyed by media id, | ||
| so the same media across fields uploads once) for the caller to upload | ||
| after the dataset id has been resolved. | ||
| """ | ||
| if self._resources is None: | ||
| return data | ||
|
|
||
| max_levels = 10 | ||
|
|
||
| def _process_data_recursively( | ||
| data: Any, level: int, ancestor_container_ids: set[int] | ||
| ) -> Any: | ||
| # Avoid jsonpath-ng here: dataset writes should keep working | ||
| # under python -OO where parser docstrings may be stripped. | ||
|
Comment on lines
+3448
to
+3449
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in my opinion an okay compromise as the write path doesn't become a hard blocker for users and easy to maintain Avoid hand rolling jsonpath parsing for the read path -> users can also opt out here so in my opinion fine |
||
| if isinstance(data, LangfuseMedia): | ||
| reference_string = data._reference_string | ||
| media_id = data._media_id | ||
| if reference_string is None or media_id is None: | ||
| raise ValueError( | ||
| "Cannot create dataset item with invalid LangfuseMedia." | ||
| ) | ||
| # First field a media appears in wins; later duplicates dedupe. | ||
| pending_media.setdefault(media_id, (data, field)) | ||
| return reference_string | ||
|
|
||
| if isinstance(data, LangfuseMediaReference): | ||
| return data.reference_string if data.reference_string else data | ||
|
|
||
| # Tuples are intentionally excluded: namedtuple subclasses can't be | ||
| # rebuilt from an iterable, so media inside them is left untouched. | ||
| if not isinstance(data, (list, set, frozenset, dict)): | ||
| return data | ||
|
|
||
| # Container ids only protect against recursive cycles. | ||
| data_id = id(data) | ||
| if data_id in ancestor_container_ids or level > max_levels: | ||
| return data | ||
|
|
||
| next_ancestor_container_ids = ancestor_container_ids | {data_id} | ||
|
|
||
| if isinstance(data, (list, set, frozenset)): | ||
| processed = ( | ||
| _process_data_recursively( | ||
| item, level + 1, next_ancestor_container_ids | ||
| ) | ||
| for item in data | ||
| ) | ||
| return type(data)(processed) | ||
|
|
||
| return { | ||
| key: _process_data_recursively( | ||
| value, level + 1, next_ancestor_container_ids | ||
|
wochinge marked this conversation as resolved.
|
||
| ) | ||
| for key, value in data.items() | ||
| } | ||
|
|
||
| return _process_data_recursively(data, 1, set()) | ||
|
|
||
| def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetItem: | ||
| media_references = item.media_references or [] | ||
| if not media_references: | ||
| return item | ||
|
|
||
| # Map the API enum member to the snake_case model attribute so this keeps | ||
| # working regardless of the enum's wire value (e.g. "expectedOutput"). | ||
| attr_by_field = { | ||
| DatasetItemMediaReferenceField.INPUT: "input", | ||
| DatasetItemMediaReferenceField.EXPECTED_OUTPUT: "expected_output", | ||
| DatasetItemMediaReferenceField.METADATA: "metadata", | ||
| } | ||
| hydrated_fields = { | ||
| "input": item.input, | ||
| "expected_output": item.expected_output, | ||
| "metadata": item.metadata, | ||
| } | ||
|
|
||
| for media_reference in media_references: | ||
| media = media_reference.media | ||
| if media is None: | ||
| continue | ||
|
|
||
| field = attr_by_field.get(media_reference.field) | ||
| if field is None: | ||
| continue | ||
|
|
||
| replacement = LangfuseMediaReference( | ||
| media_id=media.media_id, | ||
| content_type=media.content_type, | ||
| url=media.url, | ||
| url_expiry=media.url_expiry, | ||
| content_length=media.content_length, | ||
| reference_string=media_reference.reference_string, | ||
| ) | ||
|
claude[bot] marked this conversation as resolved.
|
||
| hydrated_fields[field] = self._replace_json_path_value( | ||
| value=hydrated_fields[field], | ||
| json_path=media_reference.json_path, | ||
| replacement=replacement, | ||
| ) | ||
|
|
||
| return item.model_copy( | ||
| update={ | ||
| "input": hydrated_fields["input"], | ||
| "expected_output": hydrated_fields["expected_output"], | ||
| "metadata": hydrated_fields["metadata"], | ||
| } | ||
| ) | ||
|
|
||
| def _replace_json_path_value( | ||
| self, *, value: Any, json_path: str, replacement: LangfuseMediaReference | ||
| ) -> Any: | ||
| try: | ||
| value = parse_jsonpath(json_path).update(value, replacement) | ||
| except Exception as e: | ||
| langfuse_logger.warning( | ||
| f"Failed to hydrate dataset media reference at JSONPath {json_path}", | ||
| exc_info=e, | ||
| ) | ||
|
|
||
| return value | ||
|
wochinge marked this conversation as resolved.
|
||
|
|
||
| def resolve_media_references( | ||
| self, | ||
| *, | ||
|
claude[bot] marked this conversation as resolved.
|
||
|
|
||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actual (non generated changes) |
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actual (non generated changes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actual (non generated changes)