From ed19fe4984b29c99f074a94127e80ea39c692cf3 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Mon, 15 Jun 2026 11:40:32 +0200 Subject: [PATCH 01/14] feat(api): update generated dataset media API --- langfuse/api/__init__.py | 9 ++++ langfuse/api/commons/__init__.py | 9 ++++ langfuse/api/commons/types/__init__.py | 9 ++++ langfuse/api/commons/types/dataset_item.py | 8 ++++ .../types/dataset_item_media_reference.py | 42 +++++++++++++++++ .../dataset_item_media_reference_field.py | 26 ++++++++++ .../dataset_item_media_reference_media.py | 47 +++++++++++++++++++ langfuse/api/dataset_items/client.py | 40 ++++++++++++++-- langfuse/api/dataset_items/raw_client.py | 34 +++++++++++++- langfuse/api/media/client.py | 44 ++++++++--------- langfuse/api/media/raw_client.py | 32 ++++++------- .../types/get_media_upload_url_request.py | 14 +++--- 12 files changed, 261 insertions(+), 53 deletions(-) create mode 100644 langfuse/api/commons/types/dataset_item_media_reference.py create mode 100644 langfuse/api/commons/types/dataset_item_media_reference_field.py create mode 100644 langfuse/api/commons/types/dataset_item_media_reference_media.py diff --git a/langfuse/api/__init__.py b/langfuse/api/__init__.py index 46985c0b9..256f1230b 100644 --- a/langfuse/api/__init__.py +++ b/langfuse/api/__init__.py @@ -86,6 +86,9 @@ CreateScoreValue, Dataset, DatasetItem, + DatasetItemMediaReference, + DatasetItemMediaReferenceField, + DatasetItemMediaReferenceMedia, DatasetRun, DatasetRunItem, DatasetRunWithItems, @@ -396,6 +399,9 @@ "CreateTextPromptType": ".prompts", "Dataset": ".commons", "DatasetItem": ".commons", + "DatasetItemMediaReference": ".commons", + "DatasetItemMediaReferenceField": ".commons", + "DatasetItemMediaReferenceMedia": ".commons", "DatasetRun": ".commons", "DatasetRunItem": ".commons", "DatasetRunWithItems": ".commons", @@ -717,6 +723,9 @@ def __dir__(): "CreateTextPromptType", "Dataset", "DatasetItem", + "DatasetItemMediaReference", + "DatasetItemMediaReferenceField", + "DatasetItemMediaReferenceMedia", "DatasetRun", "DatasetRunItem", "DatasetRunWithItems", diff --git a/langfuse/api/commons/__init__.py b/langfuse/api/commons/__init__.py index 81cb57f96..a35064081 100644 --- a/langfuse/api/commons/__init__.py +++ b/langfuse/api/commons/__init__.py @@ -20,6 +20,9 @@ CreateScoreValue, Dataset, DatasetItem, + DatasetItemMediaReference, + DatasetItemMediaReferenceField, + DatasetItemMediaReferenceMedia, DatasetRun, DatasetRunItem, DatasetRunWithItems, @@ -84,6 +87,9 @@ "CreateScoreValue": ".types", "Dataset": ".types", "DatasetItem": ".types", + "DatasetItemMediaReference": ".types", + "DatasetItemMediaReferenceField": ".types", + "DatasetItemMediaReferenceMedia": ".types", "DatasetRun": ".types", "DatasetRunItem": ".types", "DatasetRunWithItems": ".types", @@ -174,6 +180,9 @@ def __dir__(): "CreateScoreValue", "Dataset", "DatasetItem", + "DatasetItemMediaReference", + "DatasetItemMediaReferenceField", + "DatasetItemMediaReferenceMedia", "DatasetRun", "DatasetRunItem", "DatasetRunWithItems", diff --git a/langfuse/api/commons/types/__init__.py b/langfuse/api/commons/types/__init__.py index 5ce0a58cd..12faf307f 100644 --- a/langfuse/api/commons/types/__init__.py +++ b/langfuse/api/commons/types/__init__.py @@ -19,6 +19,9 @@ from .create_score_value import CreateScoreValue from .dataset import Dataset from .dataset_item import DatasetItem + from .dataset_item_media_reference import DatasetItemMediaReference + from .dataset_item_media_reference_field import DatasetItemMediaReferenceField + from .dataset_item_media_reference_media import DatasetItemMediaReferenceMedia from .dataset_run import DatasetRun from .dataset_run_item import DatasetRunItem from .dataset_run_with_items import DatasetRunWithItems @@ -78,6 +81,9 @@ "CreateScoreValue": ".create_score_value", "Dataset": ".dataset", "DatasetItem": ".dataset_item", + "DatasetItemMediaReference": ".dataset_item_media_reference", + "DatasetItemMediaReferenceField": ".dataset_item_media_reference_field", + "DatasetItemMediaReferenceMedia": ".dataset_item_media_reference_media", "DatasetRun": ".dataset_run", "DatasetRunItem": ".dataset_run_item", "DatasetRunWithItems": ".dataset_run_with_items", @@ -163,6 +169,9 @@ def __dir__(): "CreateScoreValue", "Dataset", "DatasetItem", + "DatasetItemMediaReference", + "DatasetItemMediaReferenceField", + "DatasetItemMediaReferenceMedia", "DatasetRun", "DatasetRunItem", "DatasetRunWithItems", diff --git a/langfuse/api/commons/types/dataset_item.py b/langfuse/api/commons/types/dataset_item.py index 54a13d81a..8d49f6439 100644 --- a/langfuse/api/commons/types/dataset_item.py +++ b/langfuse/api/commons/types/dataset_item.py @@ -7,6 +7,7 @@ import typing_extensions from ...core.pydantic_utilities import UniversalBaseModel from ...core.serialization import FieldMetadata +from .dataset_item_media_reference import DatasetItemMediaReference from .dataset_status import DatasetStatus @@ -52,6 +53,13 @@ class DatasetItem(UniversalBaseModel): updated_at: typing_extensions.Annotated[ dt.datetime, FieldMetadata(alias="updatedAt") ] + media_references: typing_extensions.Annotated[ + typing.Optional[typing.List[DatasetItemMediaReference]], + FieldMetadata(alias="mediaReferences"), + ] = pydantic.Field(default=None) + """ + Resolved Langfuse media references found in input, expectedOutput, and metadata. Only present when requested via includeMediaReferences. + """ model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( extra="allow", frozen=True diff --git a/langfuse/api/commons/types/dataset_item_media_reference.py b/langfuse/api/commons/types/dataset_item_media_reference.py new file mode 100644 index 000000000..95121fa41 --- /dev/null +++ b/langfuse/api/commons/types/dataset_item_media_reference.py @@ -0,0 +1,42 @@ +# This file was auto-generated by Fern from our API Definition. + +import typing + +import pydantic +import typing_extensions +from ...core.pydantic_utilities import UniversalBaseModel +from ...core.serialization import FieldMetadata +from .dataset_item_media_reference_field import DatasetItemMediaReferenceField +from .dataset_item_media_reference_media import DatasetItemMediaReferenceMedia + + +class DatasetItemMediaReference(UniversalBaseModel): + field: DatasetItemMediaReferenceField = pydantic.Field() + """ + The dataset item field containing the reference + """ + + reference_string: typing_extensions.Annotated[ + str, FieldMetadata(alias="referenceString") + ] = pydantic.Field() + """ + The Langfuse media reference string, e.g. `@@@langfuseMedia:type=image/png|id=...|source=bytes@@@` + """ + + json_path: typing_extensions.Annotated[str, FieldMetadata(alias="jsonPath")] = ( + pydantic.Field() + ) + """ + JSONPath of the string holding the reference within the field, e.g. `$['image']` + """ + + media: typing.Optional[DatasetItemMediaReferenceMedia] = pydantic.Field( + default=None + ) + """ + The resolved media record. Null if the referenced media does not exist or has not been uploaded successfully. + """ + + model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( + extra="allow", frozen=True + ) diff --git a/langfuse/api/commons/types/dataset_item_media_reference_field.py b/langfuse/api/commons/types/dataset_item_media_reference_field.py new file mode 100644 index 000000000..6a7c3a23e --- /dev/null +++ b/langfuse/api/commons/types/dataset_item_media_reference_field.py @@ -0,0 +1,26 @@ +# This file was auto-generated by Fern from our API Definition. + +import typing + +from ...core import enum + +T_Result = typing.TypeVar("T_Result") + + +class DatasetItemMediaReferenceField(enum.StrEnum): + INPUT = "input" + EXPECTED_OUTPUT = "expected_output" + METADATA = "metadata" + + def visit( + self, + input: typing.Callable[[], T_Result], + expected_output: typing.Callable[[], T_Result], + metadata: typing.Callable[[], T_Result], + ) -> T_Result: + if self is DatasetItemMediaReferenceField.INPUT: + return input() + if self is DatasetItemMediaReferenceField.EXPECTED_OUTPUT: + return expected_output() + if self is DatasetItemMediaReferenceField.METADATA: + return metadata() diff --git a/langfuse/api/commons/types/dataset_item_media_reference_media.py b/langfuse/api/commons/types/dataset_item_media_reference_media.py new file mode 100644 index 000000000..94ca39f38 --- /dev/null +++ b/langfuse/api/commons/types/dataset_item_media_reference_media.py @@ -0,0 +1,47 @@ +# This file was auto-generated by Fern from our API Definition. + +import typing + +import pydantic +import typing_extensions +from ...core.pydantic_utilities import UniversalBaseModel +from ...core.serialization import FieldMetadata + + +class DatasetItemMediaReferenceMedia(UniversalBaseModel): + media_id: typing_extensions.Annotated[str, FieldMetadata(alias="mediaId")] = ( + pydantic.Field() + ) + """ + The unique langfuse identifier of the media record + """ + + content_type: typing_extensions.Annotated[ + str, FieldMetadata(alias="contentType") + ] = pydantic.Field() + """ + The MIME type of the media record + """ + + content_length: typing_extensions.Annotated[ + int, FieldMetadata(alias="contentLength") + ] = pydantic.Field() + """ + The size of the media record in bytes + """ + + url: str = pydantic.Field() + """ + The signed download URL of the media record + """ + + url_expiry: typing_extensions.Annotated[str, FieldMetadata(alias="urlExpiry")] = ( + pydantic.Field() + ) + """ + The expiry date and time of the download URL + """ + + model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( + extra="allow", frozen=True + ) diff --git a/langfuse/api/dataset_items/client.py b/langfuse/api/dataset_items/client.py index c44ca24d8..62416019c 100644 --- a/langfuse/api/dataset_items/client.py +++ b/langfuse/api/dataset_items/client.py @@ -103,7 +103,11 @@ def create( return _response.data def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> DatasetItem: """ Get a dataset item @@ -112,6 +116,9 @@ def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -135,7 +142,11 @@ def get( id="id", ) """ - _response = self._raw_client.get(id, request_options=request_options) + _response = self._raw_client.get( + id, + include_media_references=include_media_references, + request_options=request_options, + ) return _response.data def list( @@ -145,6 +156,7 @@ def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -166,6 +178,9 @@ def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -198,6 +213,7 @@ def list( source_trace_id=source_trace_id, source_observation_id=source_observation_id, version=version, + include_media_references=include_media_references, page=page, limit=limit, request_options=request_options, @@ -337,7 +353,11 @@ async def main() -> None: return _response.data async def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> DatasetItem: """ Get a dataset item @@ -346,6 +366,9 @@ async def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -377,7 +400,11 @@ async def main() -> None: asyncio.run(main()) """ - _response = await self._raw_client.get(id, request_options=request_options) + _response = await self._raw_client.get( + id, + include_media_references=include_media_references, + request_options=request_options, + ) return _response.data async def list( @@ -387,6 +414,7 @@ async def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -408,6 +436,9 @@ async def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -448,6 +479,7 @@ async def main() -> None: source_trace_id=source_trace_id, source_observation_id=source_observation_id, version=version, + include_media_references=include_media_references, page=page, limit=limit, request_options=request_options, diff --git a/langfuse/api/dataset_items/raw_client.py b/langfuse/api/dataset_items/raw_client.py index 6aeafb54d..9fd58b78a 100644 --- a/langfuse/api/dataset_items/raw_client.py +++ b/langfuse/api/dataset_items/raw_client.py @@ -167,7 +167,11 @@ def create( ) def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> HttpResponse[DatasetItem]: """ Get a dataset item @@ -176,6 +180,9 @@ def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -186,6 +193,9 @@ def get( _response = self._client_wrapper.httpx_client.request( f"api/public/dataset-items/{jsonable_encoder(id)}", method="GET", + params={ + "includeMediaReferences": include_media_references, + }, request_options=request_options, ) try: @@ -273,6 +283,7 @@ def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -294,6 +305,9 @@ def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -315,6 +329,7 @@ def list( "sourceTraceId": source_trace_id, "sourceObservationId": source_observation_id, "version": serialize_datetime(version) if version is not None else None, + "includeMediaReferences": include_media_references, "page": page, "limit": limit, }, @@ -641,7 +656,11 @@ async def create( ) async def get( - self, id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + id: str, + *, + include_media_references: typing.Optional[bool] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> AsyncHttpResponse[DatasetItem]: """ Get a dataset item @@ -650,6 +669,9 @@ async def get( ---------- id : str + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -660,6 +682,9 @@ async def get( _response = await self._client_wrapper.httpx_client.request( f"api/public/dataset-items/{jsonable_encoder(id)}", method="GET", + params={ + "includeMediaReferences": include_media_references, + }, request_options=request_options, ) try: @@ -747,6 +772,7 @@ async def list( source_trace_id: typing.Optional[str] = None, source_observation_id: typing.Optional[str] = None, version: typing.Optional[dt.datetime] = None, + include_media_references: typing.Optional[bool] = None, page: typing.Optional[int] = None, limit: typing.Optional[int] = None, request_options: typing.Optional[RequestOptions] = None, @@ -768,6 +794,9 @@ async def list( If provided, returns state of dataset at this timestamp. If not provided, returns the latest version. Requires datasetName to be specified. + include_media_references : typing.Optional[bool] + If true, resolve Langfuse media references in input, expectedOutput, and metadata to signed download URLs and include them as mediaReferences. Defaults to false. + page : typing.Optional[int] page number, starts at 1 @@ -789,6 +818,7 @@ async def list( "sourceTraceId": source_trace_id, "sourceObservationId": source_observation_id, "version": serialize_datetime(version) if version is not None else None, + "includeMediaReferences": include_media_references, "page": page, "limit": limit, }, diff --git a/langfuse/api/media/client.py b/langfuse/api/media/client.py index b22272b92..648cc72f5 100644 --- a/langfuse/api/media/client.py +++ b/langfuse/api/media/client.py @@ -138,12 +138,12 @@ def patch( def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> GetMediaUploadUrlResponse: """ @@ -151,9 +151,6 @@ def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -162,11 +159,14 @@ def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -189,20 +189,18 @@ def get_upload_url( base_url="https://yourhost.com/path/to/api", ) client.media.get_upload_url( - trace_id="traceId", content_type=MediaContentType.IMAGE_PNG, content_length=1, sha256hash="sha256Hash", - field="field", ) """ _response = self._raw_client.get_upload_url( - trace_id=trace_id, content_type=content_type, content_length=content_length, sha256hash=sha256hash, - field=field, + trace_id=trace_id, observation_id=observation_id, + field=field, request_options=request_options, ) return _response.data @@ -349,12 +347,12 @@ async def main() -> None: async def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> GetMediaUploadUrlResponse: """ @@ -362,9 +360,6 @@ async def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -373,11 +368,14 @@ async def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -405,23 +403,21 @@ async def get_upload_url( async def main() -> None: await client.media.get_upload_url( - trace_id="traceId", content_type=MediaContentType.IMAGE_PNG, content_length=1, sha256hash="sha256Hash", - field="field", ) asyncio.run(main()) """ _response = await self._raw_client.get_upload_url( - trace_id=trace_id, content_type=content_type, content_length=content_length, sha256hash=sha256hash, - field=field, + trace_id=trace_id, observation_id=observation_id, + field=field, request_options=request_options, ) return _response.data diff --git a/langfuse/api/media/raw_client.py b/langfuse/api/media/raw_client.py index 4cc619770..9f4fa3d81 100644 --- a/langfuse/api/media/raw_client.py +++ b/langfuse/api/media/raw_client.py @@ -251,12 +251,12 @@ def patch( def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> HttpResponse[GetMediaUploadUrlResponse]: """ @@ -264,9 +264,6 @@ def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -275,11 +272,14 @@ def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -609,12 +609,12 @@ async def patch( async def get_upload_url( self, *, - trace_id: str, content_type: MediaContentType, content_length: int, sha256hash: str, - field: str, + trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> AsyncHttpResponse[GetMediaUploadUrlResponse]: """ @@ -622,9 +622,6 @@ async def get_upload_url( Parameters ---------- - trace_id : str - The trace ID associated with the media record - content_type : MediaContentType content_length : int @@ -633,11 +630,14 @@ async def get_upload_url( sha256hash : str The SHA-256 hash of the media record - field : str - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + trace_id : typing.Optional[str] + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. observation_id : typing.Optional[str] - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + + field : typing.Optional[str] + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. request_options : typing.Optional[RequestOptions] Request-specific configuration. diff --git a/langfuse/api/media/types/get_media_upload_url_request.py b/langfuse/api/media/types/get_media_upload_url_request.py index 99f055847..7222fbdba 100644 --- a/langfuse/api/media/types/get_media_upload_url_request.py +++ b/langfuse/api/media/types/get_media_upload_url_request.py @@ -10,18 +10,18 @@ class GetMediaUploadUrlRequest(UniversalBaseModel): - trace_id: typing_extensions.Annotated[str, FieldMetadata(alias="traceId")] = ( - pydantic.Field() - ) + trace_id: typing_extensions.Annotated[ + typing.Optional[str], FieldMetadata(alias="traceId") + ] = pydantic.Field(default=None) """ - The trace ID associated with the media record + The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. """ observation_id: typing_extensions.Annotated[ typing.Optional[str], FieldMetadata(alias="observationId") ] = pydantic.Field(default=None) """ - The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. """ content_type: typing_extensions.Annotated[ @@ -41,9 +41,9 @@ class GetMediaUploadUrlRequest(UniversalBaseModel): The SHA-256 hash of the media record """ - field: str = pydantic.Field() + field: typing.Optional[str] = pydantic.Field(default=None) """ - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata` + The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. """ model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( From cd791b64eee1677fdddca4070cdd289f4958969a Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Mon, 15 Jun 2026 17:35:27 +0200 Subject: [PATCH 02/14] feat(datasets): support media references Adds dataset item media upload and optional get_dataset media hydration. --- langfuse/__init__.py | 3 + langfuse/_client/client.py | 126 ++++++++++++++++++- langfuse/_task_manager/media_manager.py | 29 +++++ langfuse/_task_manager/media_upload_queue.py | 4 +- langfuse/media.py | 45 +++++++ pyproject.toml | 1 + tests/e2e/test_datasets.py | 56 +++++++++ tests/unit/test_datasets.py | 116 +++++++++++++++++ tests/unit/test_media_manager.py | 30 +++++ uv.lock | 13 +- 10 files changed, 414 insertions(+), 9 deletions(-) create mode 100644 tests/unit/test_datasets.py diff --git a/langfuse/__init__.py b/langfuse/__init__.py index 08d8325cf..c8144a0e4 100644 --- a/langfuse/__init__.py +++ b/langfuse/__init__.py @@ -29,6 +29,7 @@ LangfuseTool, ) from ._version import __version__ +from .media import LangfuseMedia, LangfuseMediaReference from .span_filter import ( KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES, is_default_export_span, @@ -41,6 +42,8 @@ __all__ = [ "Langfuse", + "LangfuseMedia", + "LangfuseMediaReference", "get_client", "observe", "propagate_attributes", diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 2f1c8d783..a989b7ac1 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -27,6 +27,7 @@ import backoff import httpx +from jsonpath_ng.ext import parse as parse_jsonpath # type: ignore[import-untyped] from opentelemetry import context as otel_context_api from opentelemetry import trace as otel_trace_api from opentelemetry.sdk.trace import ReadableSpan, TracerProvider @@ -127,7 +128,7 @@ _run_task, ) from langfuse.logger import langfuse_logger -from langfuse.media import LangfuseMedia +from langfuse.media import LangfuseMedia, LangfuseMediaReference from langfuse.model import ( ChatMessageDict, ChatMessageWithPlaceholdersDict, @@ -2262,15 +2263,17 @@ def get_dataset( *, fetch_items_page_size: Optional[int] = 50, version: Optional[datetime] = None, + resolve_media_references: bool = False, ) -> "DatasetClient": """Fetch a dataset by its name. Args: - name (str): The name of the dataset to fetch. - fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. - version (Optional[datetime]): Retrieve dataset items as they existed at this specific point in time (UTC). + name: The name of the dataset to fetch. + fetch_items_page_size: All items of the dataset will be fetched in chunks of this size. Defaults to 50. + version: Retrieve dataset items as they existed at this specific point in time (UTC). If provided, returns the state of items at the specified UTC timestamp. If not provided, returns the latest version. Must be a timezone-aware datetime object in UTC. + resolve_media_references: If true, resolve media reference strings in dataset items to LangfuseMediaReference objects. Returns: DatasetClient: The dataset with the given name. @@ -2279,7 +2282,7 @@ def get_dataset( langfuse_logger.debug(f"Getting datasets {name}") dataset = self.api.datasets.get(dataset_name=self._url_encode(name)) - dataset_items = [] + dataset_items: List[DatasetItem] = [] page = 1 while True: @@ -2288,8 +2291,16 @@ def get_dataset( page=page, limit=fetch_items_page_size, version=version, + include_media_references=resolve_media_references or None, + ) + dataset_items.extend( + [ + self._hydrate_dataset_item_media_references(item) + for item in new_items.data + ] + if resolve_media_references + else new_items.data ) - dataset_items.extend(new_items.data) if new_items.meta.total_pages <= page: break @@ -3295,6 +3306,17 @@ def create_dataset_item( try: langfuse_logger.debug(f"Creating dataset item for dataset {dataset_name}") + uploaded_media_ids: set[str] = set() + input = self._process_dataset_item_media( + data=input, uploaded_media_ids=uploaded_media_ids + ) + expected_output = self._process_dataset_item_media( + data=expected_output, uploaded_media_ids=uploaded_media_ids + ) + metadata = self._process_dataset_item_media( + data=metadata, uploaded_media_ids=uploaded_media_ids + ) + result = self.api.dataset_items.create( dataset_name=dataset_name, input=input, @@ -3311,6 +3333,98 @@ def create_dataset_item( handle_fern_exception(e) raise e + def _process_dataset_item_media( + self, *, data: Any, uploaded_media_ids: set[str] + ) -> Any: + if self._resources is None: + return data + + for match in parse_jsonpath("$..`this`").find(data): + if not isinstance(match.value, LangfuseMedia): + continue + + data = match.full_path.update( + data, + self._upload_dataset_item_media( + media=match.value, uploaded_media_ids=uploaded_media_ids + ), + ) + + return data + + def _upload_dataset_item_media( + self, *, media: LangfuseMedia, uploaded_media_ids: set[str] + ) -> str: + reference_string = media._reference_string + media_id = media._media_id + + if reference_string is None or media_id is None: + raise ValueError("Cannot create dataset item with invalid LangfuseMedia.") + + if media_id not in uploaded_media_ids: + assert self._resources is not None + self._resources._media_manager._upload_media_sync(media=media) + uploaded_media_ids.add(media_id) + + return reference_string + + def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetItem: + media_references = item.media_references or [] + if not media_references: + return item + + hydrated_fields = { + "input": item.input, + "expected_output": item.expected_output, + "metadata": item.metadata, + } + + for media_reference in media_references: + media = media_reference.media + if media is None: + continue + + field = media_reference.field.value + if field not in hydrated_fields: + continue + + replacement = LangfuseMediaReference( + media_id=media.media_id, + content_type=media.content_type, + url=media.url, + url_expiry=media.url_expiry, + content_length=media.content_length, + ) + hydrated_fields[field] = self._replace_json_path_value( + value=hydrated_fields[field], + json_path=media_reference.json_path, + replacement=replacement, + ) + + return item.model_copy( + update={ + "input": hydrated_fields["input"], + "expected_output": hydrated_fields["expected_output"], + "metadata": hydrated_fields["metadata"], + } + ) + + def _replace_json_path_value( + self, *, value: Any, json_path: str, replacement: LangfuseMediaReference + ) -> Any: + if json_path == "$": + return replacement + + try: + parse_jsonpath(json_path).update(value, replacement) + except Exception as e: + langfuse_logger.debug( + f"Failed to hydrate dataset media reference at JSONPath {json_path}", + exc_info=e, + ) + + return value + def resolve_media_references( self, *, diff --git a/langfuse/_task_manager/media_manager.py b/langfuse/_task_manager/media_manager.py index 7a7123798..9f71c1bef 100644 --- a/langfuse/_task_manager/media_manager.py +++ b/langfuse/_task_manager/media_manager.py @@ -230,6 +230,35 @@ def _process_media( f"Media processing error: Failed to process media_id={media._media_id} for trace_id={trace_id}. Error: {str(e)}" ) + def _upload_media_sync(self, *, media: LangfuseMedia) -> None: + if not self._enabled: + raise ValueError("Cannot upload LangfuseMedia while media upload is disabled.") + + if ( + media._content_length is None + or media._content_type is None + or media._content_sha256_hash is None + or media._content_bytes is None + ): + return + + if media._media_id is None: + logger.error("Media ID is None. Skipping upload.") + return + + upload_media_job = UploadMediaJob( + media_id=media._media_id, + content_bytes=media._content_bytes, + content_type=media._content_type, + content_length=media._content_length, + content_sha256_hash=media._content_sha256_hash, + trace_id=None, + observation_id=None, + field=None, + ) + + self._process_upload_media_job(data=upload_media_job) + def _process_upload_media_job( self, *, diff --git a/langfuse/_task_manager/media_upload_queue.py b/langfuse/_task_manager/media_upload_queue.py index e4cd8ebee..aac852105 100644 --- a/langfuse/_task_manager/media_upload_queue.py +++ b/langfuse/_task_manager/media_upload_queue.py @@ -7,6 +7,6 @@ class UploadMediaJob(TypedDict): content_length: int content_bytes: bytes content_sha256_hash: str - trace_id: str + trace_id: Optional[str] observation_id: Optional[str] - field: str + field: Optional[str] diff --git a/langfuse/media.py b/langfuse/media.py index 53940382c..b20c6e52d 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -4,6 +4,8 @@ import hashlib import os import re +from dataclasses import dataclass +from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, TypeVar, cast import httpx @@ -18,6 +20,49 @@ T = TypeVar("T") +@dataclass(frozen=True) +class LangfuseMediaReference: + """Resolved reference to media stored in Langfuse.""" + + media_id: str + content_type: str + url: str + url_expiry: Optional[str] = None + content_length: Optional[int] = None + + def url_is_expired(self) -> bool: + """Return whether the signed URL is already expired.""" + if self.url_expiry is None: + return False + + expiry = self.url_expiry.replace("Z", "+00:00") + + try: + expiry_datetime = datetime.fromisoformat(expiry) + except ValueError: + return False + + if expiry_datetime.tzinfo is None: + expiry_datetime = expiry_datetime.replace(tzinfo=timezone.utc) + + return expiry_datetime <= datetime.now(timezone.utc) + + def fetch_bytes(self) -> bytes: + """Fetch the media content from the signed URL.""" + response = httpx.get(self.url) + response.raise_for_status() + + return response.content + + def fetch_base64(self) -> str: + """Fetch media and return raw base64 without a data URI prefix.""" + return base64.b64encode(self.fetch_bytes()).decode() + + def fetch_data_uri(self) -> str: + """Fetch media and return it as a data URI.""" + return f"data:{self.content_type};base64,{self.fetch_base64()}" + + class LangfuseMedia: """A class for wrapping media objects for upload to Langfuse. diff --git a/pyproject.toml b/pyproject.toml index ceb7ea368..d210149f1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "opentelemetry-api>=1.33.1,<2", "opentelemetry-sdk>=1.33.1,<2", "opentelemetry-exporter-otlp-proto-http>=1.33.1,<2", + "jsonpath-ng>=1.8.0", ] [dependency-groups] diff --git a/tests/e2e/test_datasets.py b/tests/e2e/test_datasets.py index 8d575180a..3ba58e03d 100644 --- a/tests/e2e/test_datasets.py +++ b/tests/e2e/test_datasets.py @@ -1,10 +1,15 @@ import time from datetime import timedelta +from pathlib import Path from langfuse import Langfuse from langfuse.api import DatasetStatus +from langfuse.media import LangfuseMedia, LangfuseMediaReference from tests.support.utils import create_uuid, wait_for_result +SAMPLE_IMAGE_PATH = Path("static/puton.jpg") +SAMPLE_IMAGE_CONTENT_TYPE = "image/jpeg" + def test_create_and_get_dataset(): langfuse = Langfuse(debug=False) @@ -69,6 +74,57 @@ def test_create_dataset_item(): assert dataset.items[0].dataset_name == name +def test_create_and_get_dataset_item_with_media(): + langfuse = Langfuse(debug=False) + name = create_uuid() + langfuse.create_dataset(name=name) + sample_image_bytes = SAMPLE_IMAGE_PATH.read_bytes() + + created_item = langfuse.create_dataset_item( + dataset_name=name, + input={ + "question": "What is in this image?", + "image": LangfuseMedia( + file_path=str(SAMPLE_IMAGE_PATH), + content_type=SAMPLE_IMAGE_CONTENT_TYPE, + ), + }, + expected_output={ + "reference": LangfuseMedia( + file_path=str(SAMPLE_IMAGE_PATH), + content_type=SAMPLE_IMAGE_CONTENT_TYPE, + ), + }, + metadata={ + "thumbnail": LangfuseMedia( + file_path=str(SAMPLE_IMAGE_PATH), + content_type=SAMPLE_IMAGE_CONTENT_TYPE, + ) + }, + ) + + assert created_item.input["image"].startswith("@@@langfuseMedia:") + + raw_dataset = langfuse.get_dataset(name) + assert isinstance(raw_dataset.items[0].input["image"], str) + + resolved_dataset = wait_for_result( + lambda: langfuse.get_dataset(name, resolve_media_references=True), + is_result_ready=lambda dataset: isinstance( + dataset.items[0].input["image"], LangfuseMediaReference + ), + ) + + resolved_item = resolved_dataset.items[0] + assert isinstance(resolved_item.input["image"], LangfuseMediaReference) + assert isinstance( + resolved_item.expected_output["reference"], LangfuseMediaReference + ) + assert isinstance(resolved_item.metadata["thumbnail"], LangfuseMediaReference) + assert resolved_item.input["image"].content_type == SAMPLE_IMAGE_CONTENT_TYPE + assert resolved_item.input["image"].fetch_bytes() == sample_image_bytes + + def test_get_all_items(): langfuse = Langfuse(debug=False) name = create_uuid() diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py new file mode 100644 index 000000000..1819fb8f4 --- /dev/null +++ b/tests/unit/test_datasets.py @@ -0,0 +1,116 @@ +from datetime import datetime, timezone +from types import SimpleNamespace +from unittest.mock import Mock + +from langfuse._client.client import Langfuse +from langfuse.api import ( + DatasetItem, + DatasetItemMediaReference, + DatasetItemMediaReferenceField, + DatasetItemMediaReferenceMedia, + DatasetStatus, +) +from langfuse.media import LangfuseMedia, LangfuseMediaReference + + +def test_hydrate_dataset_item_media_references_replaces_matching_fields(): + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + item = DatasetItem( + id="item-id", + status=DatasetStatus.ACTIVE, + input={ + "image": reference_string, + "duplicate": reference_string, + "text": "keep", + }, + expected_output=[reference_string], + metadata={"nested": {"image": reference_string}}, + dataset_id="dataset-id", + dataset_name="dataset-name", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + media_references=[ + DatasetItemMediaReference( + field=DatasetItemMediaReferenceField.INPUT, + reference_string=reference_string, + json_path="$['image']", + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ), + DatasetItemMediaReference( + field=DatasetItemMediaReferenceField.EXPECTED_OUTPUT, + reference_string=reference_string, + json_path="$[0]", + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ), + DatasetItemMediaReference( + field=DatasetItemMediaReferenceField.METADATA, + reference_string=reference_string, + json_path="$['nested']['image']", + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ), + ], + ) + + client = object.__new__(Langfuse) + + hydrated = client._hydrate_dataset_item_media_references(item) + + assert hydrated.input["text"] == "keep" + assert isinstance(hydrated.input["image"], LangfuseMediaReference) + assert hydrated.input["duplicate"] == reference_string + assert isinstance(hydrated.expected_output[0], LangfuseMediaReference) + assert isinstance(hydrated.metadata["nested"]["image"], LangfuseMediaReference) + assert hydrated.input["image"].media_id == "media-id" + + +def test_create_dataset_item_processes_media_before_api_call(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + root_media = LangfuseMedia(content_bytes=b"root", content_type="image/png") + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + result = client.create_dataset_item( + dataset_name="dataset", + input={"image": media}, + expected_output=root_media, + metadata={"items": [media], "keep": "value"}, + ) + + assert result == "created-item" + media_manager._upload_media_sync.assert_any_call(media=media) + media_manager._upload_media_sync.assert_any_call(media=root_media) + assert media_manager._upload_media_sync.call_count == 2 + dataset_items_api.create.assert_called_once_with( + dataset_name="dataset", + input={"image": media._reference_string}, + expected_output=root_media._reference_string, + metadata={"items": [media._reference_string], "keep": "value"}, + source_trace_id=None, + source_observation_id=None, + status=None, + id=None, + ) diff --git a/tests/unit/test_media_manager.py b/tests/unit/test_media_manager.py index 68684fac4..43b57aa98 100644 --- a/tests/unit/test_media_manager.py +++ b/tests/unit/test_media_manager.py @@ -142,6 +142,36 @@ def test_find_and_process_media_valid_base64_uri_is_processed(): assert not queue.empty() +def test_upload_media_sync_uploads_without_trace_context(): + media_api = Mock() + media_api.get_upload_url.return_value = SimpleNamespace( + upload_url="https://example.com/upload", + media_id=None, + ) + media_api.patch.return_value = None + + httpx_client = Mock() + httpx_client.put.return_value = _upload_response(200, "ok") + + manager = MediaManager( + api_client=SimpleNamespace(media=media_api), + httpx_client=httpx_client, + media_upload_queue=Queue(), + ) + + media = LangfuseMedia(content_bytes=b"payload", content_type="image/jpeg") + media_api.get_upload_url.return_value.media_id = media._media_id + + manager._upload_media_sync(media=media) + + media_api.get_upload_url.assert_called_once() + assert media_api.get_upload_url.call_args.kwargs["trace_id"] is None + assert media_api.get_upload_url.call_args.kwargs["observation_id"] is None + assert media_api.get_upload_url.call_args.kwargs["field"] is None + httpx_client.put.assert_called_once() + media_api.patch.assert_called_once() + + def test_find_and_process_media_data_uri_without_comma_passes_through(): queue = Queue() manager = MediaManager( diff --git a/uv.lock b/uv.lock index 7c321118f..9356aef55 100644 --- a/uv.lock +++ b/uv.lock @@ -3,7 +3,7 @@ revision = 3 requires-python = ">=3.10, <4.0" [options] -exclude-newer = "2026-06-04T15:30:59.411452714Z" +exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for backwards compatibility when using relative exclude-newer values. exclude-newer-span = "P7D" [[package]] @@ -469,6 +469,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/73/07/02e16ed01e04a374e644b575638ec7987ae846d25ad97bcc9945a3ee4b0e/jsonpatch-1.33-py2.py3-none-any.whl", hash = "sha256:0ae28c0cd062bbd8b8ecc26d7d164fbbea9652a1a3693f3b956c1eae5145dade", size = 12898, upload-time = "2023-06-16T21:01:28.466Z" }, ] +[[package]] +name = "jsonpath-ng" +version = "1.8.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/32/58/250751940d75c8019659e15482d548a4aa3b6ce122c515102a4bfdac50e3/jsonpath_ng-1.8.0.tar.gz", hash = "sha256:54252968134b5e549ea5b872f1df1168bd7defe1a52fed5a358c194e1943ddc3", size = 74513, upload-time = "2026-02-24T14:42:06.182Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/03/99/33c7d78a3fb70d545fd5411ac67a651c81602cc09c9cf0df383733f068c5/jsonpath_ng-1.8.0-py3-none-any.whl", hash = "sha256:b8dde192f8af58d646fc031fac9c99fe4d00326afc4148f1f043c601a8cfe138", size = 67844, upload-time = "2026-02-28T00:53:19.637Z" }, +] + [[package]] name = "jsonpointer" version = "3.1.0" @@ -559,6 +568,7 @@ source = { editable = "." } dependencies = [ { name = "backoff" }, { name = "httpx" }, + { name = "jsonpath-ng" }, { name = "opentelemetry-api" }, { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-sdk" }, @@ -593,6 +603,7 @@ docs = [ requires-dist = [ { name = "backoff", specifier = ">=1.10.0" }, { name = "httpx", specifier = ">=0.15.4,<1.0" }, + { name = "jsonpath-ng", specifier = ">=1.8.0" }, { name = "opentelemetry-api", specifier = ">=1.33.1,<2" }, { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.33.1,<2" }, { name = "opentelemetry-sdk", specifier = ">=1.33.1,<2" }, From a9a4a965f2fe08b966d13ccf638a8099d0cb0ad0 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Mon, 15 Jun 2026 18:19:17 +0200 Subject: [PATCH 03/14] fix(datasets): address media review feedback --- langfuse/_client/client.py | 38 ++++++++++++++++++------- langfuse/_task_manager/media_manager.py | 5 ++-- langfuse/media.py | 12 ++++---- tests/unit/test_datasets.py | 8 ++++-- tests/unit/test_media.py | 19 ++++++++++++- tests/unit/test_media_manager.py | 12 ++++++++ 6 files changed, 71 insertions(+), 23 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index a989b7ac1..89a801b2d 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3339,18 +3339,34 @@ def _process_dataset_item_media( if self._resources is None: return data - for match in parse_jsonpath("$..`this`").find(data): - if not isinstance(match.value, LangfuseMedia): - continue + seen = set() + max_levels = 10 + + def _process_data_recursively(data: Any, level: int) -> Any: + # Avoid jsonpath-ng here: create_dataset_item should not fail under + # python -OO for users who are not resolving media references. + if isinstance(data, LangfuseMedia): + return self._upload_dataset_item_media( + media=data, uploaded_media_ids=uploaded_media_ids + ) - data = match.full_path.update( - data, - self._upload_dataset_item_media( - media=match.value, uploaded_media_ids=uploaded_media_ids - ), - ) + if not isinstance(data, (list, dict)): + return data + + if id(data) in seen or level > max_levels: + return data + + seen.add(id(data)) + + if isinstance(data, list): + return [_process_data_recursively(item, level + 1) for item in data] + + return { + key: _process_data_recursively(value, level + 1) + for key, value in data.items() + } - return data + return _process_data_recursively(data, 1) def _upload_dataset_item_media( self, *, media: LangfuseMedia, uploaded_media_ids: set[str] @@ -3416,7 +3432,7 @@ def _replace_json_path_value( return replacement try: - parse_jsonpath(json_path).update(value, replacement) + value = parse_jsonpath(json_path).update(value, replacement) except Exception as e: langfuse_logger.debug( f"Failed to hydrate dataset media reference at JSONPath {json_path}", diff --git a/langfuse/_task_manager/media_manager.py b/langfuse/_task_manager/media_manager.py index 9f71c1bef..0122176e0 100644 --- a/langfuse/_task_manager/media_manager.py +++ b/langfuse/_task_manager/media_manager.py @@ -240,11 +240,10 @@ def _upload_media_sync(self, *, media: LangfuseMedia) -> None: or media._content_sha256_hash is None or media._content_bytes is None ): - return + raise ValueError("Cannot upload invalid LangfuseMedia.") if media._media_id is None: - logger.error("Media ID is None. Skipping upload.") - return + raise ValueError("Cannot upload LangfuseMedia without media ID.") upload_media_job = UploadMediaJob( media_id=media._media_id, diff --git a/langfuse/media.py b/langfuse/media.py index b20c6e52d..13bf6ee7c 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -47,20 +47,20 @@ def url_is_expired(self) -> bool: return expiry_datetime <= datetime.now(timezone.utc) - def fetch_bytes(self) -> bytes: + def fetch_bytes(self, *, timeout: float = 30.0) -> bytes: """Fetch the media content from the signed URL.""" - response = httpx.get(self.url) + response = httpx.get(self.url, timeout=timeout) response.raise_for_status() return response.content - def fetch_base64(self) -> str: + def fetch_base64(self, *, timeout: float = 30.0) -> str: """Fetch media and return raw base64 without a data URI prefix.""" - return base64.b64encode(self.fetch_bytes()).decode() + return base64.b64encode(self.fetch_bytes(timeout=timeout)).decode() - def fetch_data_uri(self) -> str: + def fetch_data_uri(self, *, timeout: float = 30.0) -> str: """Fetch media and return it as a data URI.""" - return f"data:{self.content_type};base64,{self.fetch_base64()}" + return f"data:{self.content_type};base64,{self.fetch_base64(timeout=timeout)}" class LangfuseMedia: diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index 1819fb8f4..8ae14c25d 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -92,15 +92,19 @@ def test_create_dataset_item_processes_media_before_api_call(): client = object.__new__(Langfuse) client._resources = SimpleNamespace(_media_manager=media_manager) client.api = SimpleNamespace(dataset_items=dataset_items_api) + input_data = {"image": media} + metadata = {"items": [media], "keep": "value"} result = client.create_dataset_item( dataset_name="dataset", - input={"image": media}, + input=input_data, expected_output=root_media, - metadata={"items": [media], "keep": "value"}, + metadata=metadata, ) assert result == "created-item" + assert input_data == {"image": media} + assert metadata == {"items": [media], "keep": "value"} media_manager._upload_media_sync.assert_any_call(media=media) media_manager._upload_media_sync.assert_any_call(media=root_media) assert media_manager._upload_media_sync.call_count == 2 diff --git a/tests/unit/test_media.py b/tests/unit/test_media.py index 63df03920..9239ab52b 100644 --- a/tests/unit/test_media.py +++ b/tests/unit/test_media.py @@ -4,7 +4,7 @@ import pytest -from langfuse.media import LangfuseMedia +from langfuse.media import LangfuseMedia, LangfuseMediaReference # Test data SAMPLE_JPEG_BYTES = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00" @@ -107,6 +107,23 @@ def test_nonexistent_file(): assert media._content_type is None +def test_media_reference_fetch_uses_timeout(monkeypatch): + response = Mock() + response.content = b"test-bytes" + response.raise_for_status.return_value = None + httpx_get = Mock(return_value=response) + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + assert reference.fetch_bytes(timeout=12.5) == b"test-bytes" + httpx_get.assert_called_once_with("https://example.com/test.jpg", timeout=12.5) + + def test_resolve_media_references_uses_configured_httpx_client(): reference_string = "@@@langfuseMedia:type=image/jpeg|id=test-id|source=bytes@@@" fetch_timeout_seconds = 7 diff --git a/tests/unit/test_media_manager.py b/tests/unit/test_media_manager.py index 43b57aa98..f856f1592 100644 --- a/tests/unit/test_media_manager.py +++ b/tests/unit/test_media_manager.py @@ -172,6 +172,18 @@ def test_upload_media_sync_uploads_without_trace_context(): media_api.patch.assert_called_once() +def test_upload_media_sync_rejects_invalid_media(): + manager = MediaManager( + api_client=SimpleNamespace(media=Mock()), + httpx_client=Mock(), + media_upload_queue=Queue(), + ) + media = LangfuseMedia() + + with pytest.raises(ValueError, match="Cannot upload invalid LangfuseMedia"): + manager._upload_media_sync(media=media) + + def test_find_and_process_media_data_uri_without_comma_passes_through(): queue = Queue() manager = MediaManager( From 2c6718a327881f7b8e39556918f776f7d6663bd7 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Tue, 16 Jun 2026 11:01:52 +0200 Subject: [PATCH 04/14] fix(datasets): refine media review fixes --- langfuse/_client/client.py | 32 +++++--- langfuse/_client/resource_manager.py | 10 +++ langfuse/media.py | 9 ++- tests/unit/test_datasets.py | 106 +++++++++++++++++++++++++++ tests/unit/test_media.py | 17 ++++- 5 files changed, 158 insertions(+), 16 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 89a801b2d..ef18ae09e 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3339,12 +3339,13 @@ def _process_dataset_item_media( if self._resources is None: return data - seen = set() max_levels = 10 - def _process_data_recursively(data: Any, level: int) -> Any: - # Avoid jsonpath-ng here: create_dataset_item should not fail under - # python -OO for users who are not resolving media references. + def _process_data_recursively( + data: Any, level: int, ancestor_container_ids: set[int] + ) -> Any: + # Avoid jsonpath-ng here: dataset writes should keep working + # under python -OO where parser docstrings may be stripped. if isinstance(data, LangfuseMedia): return self._upload_dataset_item_media( media=data, uploaded_media_ids=uploaded_media_ids @@ -3353,20 +3354,30 @@ def _process_data_recursively(data: Any, level: int) -> Any: if not isinstance(data, (list, dict)): return data - if id(data) in seen or level > max_levels: + # Container ids only protect against recursive cycles; media upload + # dedupe is handled by uploaded_media_ids. + data_id = id(data) + if data_id in ancestor_container_ids or level > max_levels: return data - seen.add(id(data)) + next_ancestor_container_ids = ancestor_container_ids | {data_id} if isinstance(data, list): - return [_process_data_recursively(item, level + 1) for item in data] + return [ + _process_data_recursively( + item, level + 1, next_ancestor_container_ids + ) + for item in data + ] return { - key: _process_data_recursively(value, level + 1) + key: _process_data_recursively( + value, level + 1, next_ancestor_container_ids + ) for key, value in data.items() } - return _process_data_recursively(data, 1) + return _process_data_recursively(data, 1, set()) def _upload_dataset_item_media( self, *, media: LangfuseMedia, uploaded_media_ids: set[str] @@ -3428,9 +3439,6 @@ def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetIt def _replace_json_path_value( self, *, value: Any, json_path: str, replacement: LangfuseMediaReference ) -> Any: - if json_path == "$": - return replacement - try: value = parse_jsonpath(json_path).update(value, replacement) except Exception as e: diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 2d42f6ce1..5dfcf29ba 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -79,6 +79,16 @@ class LangfuseResourceManager: _instances: Dict[str, "LangfuseResourceManager"] = {} _lock = threading.RLock() + @classmethod + def get_singleton_httpx_client(cls) -> Optional[httpx.Client]: + with cls._lock: + instances = list(cls._instances.values()) + + if len(instances) != 1: + return None + + return instances[0].httpx_client + def __new__( cls, *, diff --git a/langfuse/media.py b/langfuse/media.py index 13bf6ee7c..8c90612dd 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -49,7 +49,14 @@ def url_is_expired(self) -> bool: def fetch_bytes(self, *, timeout: float = 30.0) -> bytes: """Fetch the media content from the signed URL.""" - response = httpx.get(self.url, timeout=timeout) + from langfuse._client.resource_manager import LangfuseResourceManager + + httpx_client = LangfuseResourceManager.get_singleton_httpx_client() + response = ( + httpx_client.get(self.url, timeout=timeout) + if httpx_client is not None + else httpx.get(self.url, timeout=timeout) + ) response.raise_for_status() return response.content diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index 8ae14c25d..fe0bcf75a 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -2,6 +2,8 @@ from types import SimpleNamespace from unittest.mock import Mock +import pytest + from langfuse._client.client import Langfuse from langfuse.api import ( DatasetItem, @@ -81,6 +83,85 @@ def test_hydrate_dataset_item_media_references_replaces_matching_fields(): assert hydrated.input["image"].media_id == "media-id" +@pytest.mark.parametrize( + ("field", "field_value", "json_path", "assert_resolved"), + [ + ( + DatasetItemMediaReferenceField.INPUT, + "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@", + "$", + lambda item: isinstance(item.input, LangfuseMediaReference), + ), + ( + DatasetItemMediaReferenceField.INPUT, + { + "image": "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + }, + "$['image']", + lambda item: isinstance(item.input["image"], LangfuseMediaReference), + ), + ( + DatasetItemMediaReferenceField.EXPECTED_OUTPUT, + ["@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@"], + "$[0]", + lambda item: isinstance( + item.expected_output[0], LangfuseMediaReference + ), + ), + ( + DatasetItemMediaReferenceField.METADATA, + { + "image'key": "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + }, + r"$['image\'key']", + lambda item: isinstance( + item.metadata["image'key"], LangfuseMediaReference + ), + ), + ], +) +def test_hydrate_dataset_item_media_references_supports_json_path_cases( + field, + field_value, + json_path, + assert_resolved, +): + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + item = DatasetItem( + id="item-id", + status=DatasetStatus.ACTIVE, + input=field_value if field == DatasetItemMediaReferenceField.INPUT else None, + expected_output=field_value + if field == DatasetItemMediaReferenceField.EXPECTED_OUTPUT + else None, + metadata=field_value if field == DatasetItemMediaReferenceField.METADATA else None, + dataset_id="dataset-id", + dataset_name="dataset-name", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + media_references=[ + DatasetItemMediaReference( + field=field, + reference_string=reference_string, + json_path=json_path, + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ) + ], + ) + + client = object.__new__(Langfuse) + + hydrated = client._hydrate_dataset_item_media_references(item) + + assert assert_resolved(hydrated) + + def test_create_dataset_item_processes_media_before_api_call(): media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") root_media = LangfuseMedia(content_bytes=b"root", content_type="image/png") @@ -118,3 +199,28 @@ def test_create_dataset_item_processes_media_before_api_call(): status=None, id=None, ) + + +def test_create_dataset_item_processes_shared_media_subtrees(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + shared = {"image": media} + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + client.create_dataset_item( + dataset_name="dataset", + input={"a": shared, "b": shared}, + ) + + assert shared == {"image": media} + media_manager._upload_media_sync.assert_called_once_with(media=media) + assert dataset_items_api.create.call_args.kwargs["input"] == { + "a": {"image": media._reference_string}, + "b": {"image": media._reference_string}, + } diff --git a/tests/unit/test_media.py b/tests/unit/test_media.py index 9239ab52b..4f42d506b 100644 --- a/tests/unit/test_media.py +++ b/tests/unit/test_media.py @@ -4,6 +4,7 @@ import pytest +from langfuse._client.resource_manager import LangfuseResourceManager from langfuse.media import LangfuseMedia, LangfuseMediaReference # Test data @@ -107,12 +108,19 @@ def test_nonexistent_file(): assert media._content_type is None -def test_media_reference_fetch_uses_timeout(monkeypatch): +def test_media_reference_fetch_uses_configured_httpx_client(monkeypatch): response = Mock() response.content = b"test-bytes" response.raise_for_status.return_value = None - httpx_get = Mock(return_value=response) + configured_httpx_client = Mock() + configured_httpx_client.get.return_value = response + httpx_get = Mock() monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + {"pk-test": SimpleNamespace(httpx_client=configured_httpx_client)}, + ) reference = LangfuseMediaReference( media_id="media-id", @@ -121,7 +129,10 @@ def test_media_reference_fetch_uses_timeout(monkeypatch): ) assert reference.fetch_bytes(timeout=12.5) == b"test-bytes" - httpx_get.assert_called_once_with("https://example.com/test.jpg", timeout=12.5) + configured_httpx_client.get.assert_called_once_with( + "https://example.com/test.jpg", timeout=12.5 + ) + httpx_get.assert_not_called() def test_resolve_media_references_uses_configured_httpx_client(): From 3832a4e9dc9175e0f7c91a1730b4c6bb279e5bfc Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Tue, 16 Jun 2026 11:25:13 +0200 Subject: [PATCH 05/14] fix(datasets): clean up media review follow-ups --- langfuse/_client/client.py | 2 +- langfuse/_client/resource_manager.py | 2 +- pyproject.toml | 2 +- tests/unit/test_datasets.py | 68 ---------------------------- tests/unit/test_media_manager.py | 30 ------------ uv.lock | 4 +- 6 files changed, 5 insertions(+), 103 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index c2c4c3da4..feef93f86 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3502,7 +3502,7 @@ def _replace_json_path_value( try: value = parse_jsonpath(json_path).update(value, replacement) except Exception as e: - langfuse_logger.debug( + langfuse_logger.warning( f"Failed to hydrate dataset media reference at JSONPath {json_path}", exc_info=e, ) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 4febe05d9..fffdd7026 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -84,7 +84,7 @@ def get_singleton_httpx_client(cls) -> Optional[httpx.Client]: with cls._lock: instances = list(cls._instances.values()) - if len(instances) != 1: + if not instances: return None return instances[0].httpx_client diff --git a/pyproject.toml b/pyproject.toml index 2a40a432f..aadacdd62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ dependencies = [ "opentelemetry-api>=1.33.1,<2", "opentelemetry-sdk>=1.33.1,<2", "opentelemetry-exporter-otlp-proto-http>=1.33.1,<2", - "jsonpath-ng>=1.8.0", + "jsonpath-ng>=1.8.0,<2", ] [dependency-groups] diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index fe0bcf75a..c8edb72d6 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -15,74 +15,6 @@ from langfuse.media import LangfuseMedia, LangfuseMediaReference -def test_hydrate_dataset_item_media_references_replaces_matching_fields(): - reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" - item = DatasetItem( - id="item-id", - status=DatasetStatus.ACTIVE, - input={ - "image": reference_string, - "duplicate": reference_string, - "text": "keep", - }, - expected_output=[reference_string], - metadata={"nested": {"image": reference_string}}, - dataset_id="dataset-id", - dataset_name="dataset-name", - created_at=datetime.now(timezone.utc), - updated_at=datetime.now(timezone.utc), - media_references=[ - DatasetItemMediaReference( - field=DatasetItemMediaReferenceField.INPUT, - reference_string=reference_string, - json_path="$['image']", - media=DatasetItemMediaReferenceMedia( - media_id="media-id", - content_type="image/png", - content_length=7, - url="https://example.com/image.png", - url_expiry="2026-06-15T12:00:00.000Z", - ), - ), - DatasetItemMediaReference( - field=DatasetItemMediaReferenceField.EXPECTED_OUTPUT, - reference_string=reference_string, - json_path="$[0]", - media=DatasetItemMediaReferenceMedia( - media_id="media-id", - content_type="image/png", - content_length=7, - url="https://example.com/image.png", - url_expiry="2026-06-15T12:00:00.000Z", - ), - ), - DatasetItemMediaReference( - field=DatasetItemMediaReferenceField.METADATA, - reference_string=reference_string, - json_path="$['nested']['image']", - media=DatasetItemMediaReferenceMedia( - media_id="media-id", - content_type="image/png", - content_length=7, - url="https://example.com/image.png", - url_expiry="2026-06-15T12:00:00.000Z", - ), - ), - ], - ) - - client = object.__new__(Langfuse) - - hydrated = client._hydrate_dataset_item_media_references(item) - - assert hydrated.input["text"] == "keep" - assert isinstance(hydrated.input["image"], LangfuseMediaReference) - assert hydrated.input["duplicate"] == reference_string - assert isinstance(hydrated.expected_output[0], LangfuseMediaReference) - assert isinstance(hydrated.metadata["nested"]["image"], LangfuseMediaReference) - assert hydrated.input["image"].media_id == "media-id" - - @pytest.mark.parametrize( ("field", "field_value", "json_path", "assert_resolved"), [ diff --git a/tests/unit/test_media_manager.py b/tests/unit/test_media_manager.py index f06df3a21..3cf152106 100644 --- a/tests/unit/test_media_manager.py +++ b/tests/unit/test_media_manager.py @@ -142,36 +142,6 @@ def test_find_and_process_media_valid_base64_uri_is_processed(): assert not queue.empty() -def test_upload_media_sync_uploads_without_trace_context(): - media_api = Mock() - media_api.get_upload_url.return_value = SimpleNamespace( - upload_url="https://example.com/upload", - media_id=None, - ) - media_api.patch.return_value = None - - httpx_client = Mock() - httpx_client.put.return_value = _upload_response(200, "ok") - - manager = MediaManager( - api_client=SimpleNamespace(media=media_api), - httpx_client=httpx_client, - media_upload_queue=Queue(), - ) - - media = LangfuseMedia(content_bytes=b"payload", content_type="image/jpeg") - media_api.get_upload_url.return_value.media_id = media._media_id - - manager._upload_media_sync(media=media) - - media_api.get_upload_url.assert_called_once() - assert media_api.get_upload_url.call_args.kwargs["trace_id"] is None - assert media_api.get_upload_url.call_args.kwargs["observation_id"] is None - assert media_api.get_upload_url.call_args.kwargs["field"] is None - httpx_client.put.assert_called_once() - media_api.patch.assert_called_once() - - def test_upload_media_sync_rejects_invalid_media(): manager = MediaManager( api_client=SimpleNamespace(media=Mock()), diff --git a/uv.lock b/uv.lock index fa17776db..9a219dbfe 100644 --- a/uv.lock +++ b/uv.lock @@ -3,7 +3,7 @@ revision = 3 requires-python = ">=3.10, <4.0" [options] -exclude-newer = "2026-06-09T08:44:29.596356662Z" +exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for backwards compatibility when using relative exclude-newer values. exclude-newer-span = "P7D" [[package]] @@ -603,7 +603,7 @@ docs = [ requires-dist = [ { name = "backoff", specifier = ">=1.10.0" }, { name = "httpx", specifier = ">=0.15.4,<1.0" }, - { name = "jsonpath-ng", specifier = ">=1.8.0" }, + { name = "jsonpath-ng", specifier = ">=1.8.0,<2" }, { name = "opentelemetry-api", specifier = ">=1.33.1,<2" }, { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.33.1,<2" }, { name = "opentelemetry-sdk", specifier = ">=1.33.1,<2" }, From 88905ce873ca7eb5fae8694929e20b230e86bbcf Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Tue, 16 Jun 2026 15:34:38 +0200 Subject: [PATCH 06/14] fix(datasets): round-trip resolved media references Resolved LangfuseMediaReference items from get_dataset(resolve_media_ references=True) discarded the original @@@langfuseMedia:...@@@ string, so feeding them back into create_dataset_item or run_experiment serialized them as opaque dicts and orphaned the media. Persist the reference string and emit it from both _process_dataset_item_media and EventSerializer. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 4 +++ langfuse/_utils/serializer.py | 8 +++++- langfuse/media.py | 1 + tests/unit/test_datasets.py | 50 +++++++++++++++++++++++++++++++++++ tests/unit/test_serializer.py | 25 ++++++++++++++++++ 5 files changed, 87 insertions(+), 1 deletion(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index feef93f86..6a014f0a0 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3411,6 +3411,9 @@ def _process_data_recursively( media=data, uploaded_media_ids=uploaded_media_ids ) + if isinstance(data, LangfuseMediaReference): + return data.reference_string if data.reference_string else data + if not isinstance(data, (list, dict)): return data @@ -3481,6 +3484,7 @@ def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetIt url=media.url, url_expiry=media.url_expiry, content_length=media.content_length, + reference_string=media_reference.reference_string, ) hydrated_fields[field] = self._replace_json_path_value( value=hydrated_fields[field], diff --git a/langfuse/_utils/serializer.py b/langfuse/_utils/serializer.py index 27294bf80..135d1f625 100644 --- a/langfuse/_utils/serializer.py +++ b/langfuse/_utils/serializer.py @@ -15,7 +15,7 @@ from pydantic import BaseModel -from langfuse.media import LangfuseMedia +from langfuse.media import LangfuseMedia, LangfuseMediaReference # Attempt to import Serializable try: @@ -62,6 +62,12 @@ def _default_inner(self, obj: Any) -> Any: or f"" ) + if ( + isinstance(obj, LangfuseMediaReference) + and obj.reference_string is not None + ): + return obj.reference_string + # Check if numpy is available and if the object is a numpy scalar # If so, convert it to a Python scalar using the item() method if np is not None and isinstance(obj, np.generic): diff --git a/langfuse/media.py b/langfuse/media.py index 8c90612dd..04ee1182a 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -29,6 +29,7 @@ class LangfuseMediaReference: url: str url_expiry: Optional[str] = None content_length: Optional[int] = None + reference_string: Optional[str] = None def url_is_expired(self) -> bool: """Return whether the signed URL is already expired.""" diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index c8edb72d6..169ad7ce6 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -133,6 +133,56 @@ def test_create_dataset_item_processes_media_before_api_call(): ) +def test_create_dataset_item_roundtrips_resolved_media_reference(): + # get_dataset(resolve_media_references=True) hydrates strings into + # LangfuseMediaReference instances. Feeding such an item back into + # create_dataset_item must re-emit the original reference string, otherwise + # the dataclass is serialized as an opaque dict and the media is orphaned. + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + item = DatasetItem( + id="item-id", + status=DatasetStatus.ACTIVE, + input={"image": reference_string}, + expected_output=None, + metadata=None, + dataset_id="dataset-id", + dataset_name="dataset-name", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + media_references=[ + DatasetItemMediaReference( + field=DatasetItemMediaReferenceField.INPUT, + reference_string=reference_string, + json_path="$['image']", + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ) + ], + ) + + client = object.__new__(Langfuse) + hydrated = client._hydrate_dataset_item_media_references(item) + assert isinstance(hydrated.input["image"], LangfuseMediaReference) + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + client.create_dataset_item(dataset_name="dataset", input=hydrated.input) + + assert dataset_items_api.create.call_args.kwargs["input"] == { + "image": reference_string + } + media_manager._upload_media_sync.assert_not_called() + + def test_create_dataset_item_processes_shared_media_subtrees(): media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") shared = {"image": media} diff --git a/tests/unit/test_serializer.py b/tests/unit/test_serializer.py index f4c8dde86..6605485de 100644 --- a/tests/unit/test_serializer.py +++ b/tests/unit/test_serializer.py @@ -12,6 +12,7 @@ from langfuse._utils.serializer import ( EventSerializer, ) +from langfuse.media import LangfuseMediaReference class TestEnum(Enum): @@ -70,6 +71,30 @@ def test_pydantic_model(): assert json.loads(serializer.encode(model)) == {"field": "test"} +def test_langfuse_media_reference_serializes_to_reference_string(): + # Resolved references must round-trip back to their original reference string + # rather than falling through to asdict() and emitting an opaque dict. + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + ref = LangfuseMediaReference( + media_id="media-id", + content_type="image/png", + url="https://example.com/image.png", + reference_string=reference_string, + ) + serializer = EventSerializer() + assert serializer.encode(ref) == f'"{reference_string}"' + + +def test_langfuse_media_reference_without_reference_string_falls_back_to_dict(): + ref = LangfuseMediaReference( + media_id="media-id", + content_type="image/png", + url="https://example.com/image.png", + ) + serializer = EventSerializer() + assert json.loads(serializer.encode(ref))["media_id"] == "media-id" + + def test_path(): path = Path("/tmp/test.txt") serializer = EventSerializer() From d7eb3edcce88cc3abbd25fa9d9562ca1cc37fee2 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Tue, 16 Jun 2026 16:25:23 +0200 Subject: [PATCH 07/14] fix(datasets): process media inside tuples and sets _process_dataset_item_media only recursed into list/dict, so a LangfuseMedia held in a tuple, set, or frozenset slipped through to fern's encoder and got silently base64-inlined instead of uploaded. Widen the walker to those containers and rebuild them in place. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 9 +++++---- tests/unit/test_datasets.py | 38 +++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 6a014f0a0..ac732c3d8 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3414,7 +3414,7 @@ def _process_data_recursively( if isinstance(data, LangfuseMediaReference): return data.reference_string if data.reference_string else data - if not isinstance(data, (list, dict)): + if not isinstance(data, (list, tuple, set, frozenset, dict)): return data # Container ids only protect against recursive cycles; media upload @@ -3425,13 +3425,14 @@ def _process_data_recursively( next_ancestor_container_ids = ancestor_container_ids | {data_id} - if isinstance(data, list): - return [ + if isinstance(data, (list, tuple, set, frozenset)): + processed = ( _process_data_recursively( item, level + 1, next_ancestor_container_ids ) for item in data - ] + ) + return type(data)(processed) return { key: _process_data_recursively( diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index 169ad7ce6..bdc6f394c 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -206,3 +206,41 @@ def test_create_dataset_item_processes_shared_media_subtrees(): "a": {"image": media._reference_string}, "b": {"image": media._reference_string}, } + + +def test_create_dataset_item_processes_media_in_tuples(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + client.create_dataset_item(dataset_name="dataset", input={"images": (media,)}) + + media_manager._upload_media_sync.assert_called_once_with(media=media) + assert dataset_items_api.create.call_args.kwargs["input"] == { + "images": (media._reference_string,) + } + + +def test_create_dataset_item_processes_media_in_sets(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api) + + client.create_dataset_item(dataset_name="dataset", input={"images": {media}}) + + media_manager._upload_media_sync.assert_called_once_with(media=media) + assert dataset_items_api.create.call_args.kwargs["input"] == { + "images": {media._reference_string} + } From 7db0fe7df9bda288082ca61eda18ffc6a319c5bc Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Wed, 17 Jun 2026 12:16:18 +0200 Subject: [PATCH 08/14] fix(datasets): align media reference field with expectedOutput API rename The DatasetItemMediaReferenceField enum value changed from expected_output to expectedOutput. Decouple hydration from the wire value by mapping the enum member to the model attribute, so the rename (and any future one) no longer silently skips expected-output media references. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 12 ++++++++++-- .../types/dataset_item_media_reference_field.py | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index ac732c3d8..915b2c65c 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -95,6 +95,7 @@ CreateTextPromptRequest, Dataset, DatasetItem, + DatasetItemMediaReferenceField, DatasetRunWithItems, DatasetStatus, DeleteDatasetRunResponse, @@ -3464,6 +3465,13 @@ def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetIt if not media_references: return item + # Map the API enum member to the snake_case model attribute so this keeps + # working regardless of the enum's wire value (e.g. "expectedOutput"). + attr_by_field = { + DatasetItemMediaReferenceField.INPUT: "input", + DatasetItemMediaReferenceField.EXPECTED_OUTPUT: "expected_output", + DatasetItemMediaReferenceField.METADATA: "metadata", + } hydrated_fields = { "input": item.input, "expected_output": item.expected_output, @@ -3475,8 +3483,8 @@ def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetIt if media is None: continue - field = media_reference.field.value - if field not in hydrated_fields: + field = attr_by_field.get(media_reference.field) + if field is None: continue replacement = LangfuseMediaReference( diff --git a/langfuse/api/commons/types/dataset_item_media_reference_field.py b/langfuse/api/commons/types/dataset_item_media_reference_field.py index 6a7c3a23e..9dc9df5cd 100644 --- a/langfuse/api/commons/types/dataset_item_media_reference_field.py +++ b/langfuse/api/commons/types/dataset_item_media_reference_field.py @@ -9,7 +9,7 @@ class DatasetItemMediaReferenceField(enum.StrEnum): INPUT = "input" - EXPECTED_OUTPUT = "expected_output" + EXPECTED_OUTPUT = "expectedOutput" METADATA = "metadata" def visit( From a2cba12d7dd2836bc0e5dac5d1b555566e8aa692 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Wed, 17 Jun 2026 13:42:10 +0200 Subject: [PATCH 09/14] fix(datasets): drop tuple media support to avoid namedtuple breakage Rebuilding tuples via type(data)(iterable) breaks namedtuple/NamedTuple subclasses, which take positional field args rather than an iterable. Keep list/set/frozenset handling and leave tuples untouched. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 6 ++++-- tests/unit/test_datasets.py | 19 ------------------- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 915b2c65c..72dc5f0e6 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3415,7 +3415,9 @@ def _process_data_recursively( if isinstance(data, LangfuseMediaReference): return data.reference_string if data.reference_string else data - if not isinstance(data, (list, tuple, set, frozenset, dict)): + # Tuples are intentionally excluded: namedtuple subclasses can't be + # rebuilt from an iterable, so media inside them is left untouched. + if not isinstance(data, (list, set, frozenset, dict)): return data # Container ids only protect against recursive cycles; media upload @@ -3426,7 +3428,7 @@ def _process_data_recursively( next_ancestor_container_ids = ancestor_container_ids | {data_id} - if isinstance(data, (list, tuple, set, frozenset)): + if isinstance(data, (list, set, frozenset)): processed = ( _process_data_recursively( item, level + 1, next_ancestor_container_ids diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index bdc6f394c..78fb87094 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -208,25 +208,6 @@ def test_create_dataset_item_processes_shared_media_subtrees(): } -def test_create_dataset_item_processes_media_in_tuples(): - media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") - - media_manager = Mock() - dataset_items_api = Mock() - dataset_items_api.create.return_value = "created-item" - - client = object.__new__(Langfuse) - client._resources = SimpleNamespace(_media_manager=media_manager) - client.api = SimpleNamespace(dataset_items=dataset_items_api) - - client.create_dataset_item(dataset_name="dataset", input={"images": (media,)}) - - media_manager._upload_media_sync.assert_called_once_with(media=media) - assert dataset_items_api.create.call_args.kwargs["input"] == { - "images": (media._reference_string,) - } - - def test_create_dataset_item_processes_media_in_sets(): media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") From 0e2ea09e4db72751b3b7924c3c8f680af3214f1e Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Wed, 17 Jun 2026 15:57:31 +0200 Subject: [PATCH 10/14] fix(media): honor per-client httpx config for media reference fetches get_singleton_httpx_client silently returned the first-inserted instance, so a LangfuseMediaReference fetched from one client could go out through another client's transport (proxy/CA/mTLS). Mirror get_client: warn and fall back to default httpx when multiple clients exist, and let fetch_bytes/fetch_base64/fetch_data_uri take an explicit httpx client to deterministically honor per-client settings. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/resource_manager.py | 13 +++++ langfuse/media.py | 42 ++++++++++++---- tests/unit/test_media.py | 72 ++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 9 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index fffdd7026..ab8416dcb 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -87,6 +87,19 @@ def get_singleton_httpx_client(cls) -> Optional[httpx.Client]: if not instances: return None + if len(instances) > 1: + # Mirror get_client's safety stance: with multiple clients we + # cannot tell which one produced a given reference, so fall back + # to a default httpx client rather than silently using an + # arbitrary instance's transport config (proxy / CA / mTLS). + langfuse_logger.warning( + "Multiple Langfuse clients are instantiated; falling back to a " + "default httpx client for LangfuseMediaReference fetches. Pass an " + "explicit `client` to fetch_bytes/fetch_base64/fetch_data_uri to " + "honor per-client transport settings." + ) + return None + return instances[0].httpx_client def __new__( diff --git a/langfuse/media.py b/langfuse/media.py index 04ee1182a..5cc7d15ea 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -48,11 +48,23 @@ def url_is_expired(self) -> bool: return expiry_datetime <= datetime.now(timezone.utc) - def fetch_bytes(self, *, timeout: float = 30.0) -> bytes: - """Fetch the media content from the signed URL.""" + def fetch_bytes( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> bytes: + """Fetch the media content from the signed URL. + + Args: + timeout: Request timeout in seconds. + client: Optional httpx client to use for the request. Pass this to + honor custom transport settings (proxy, CA bundle, mTLS) — in + particular when multiple Langfuse clients are configured, since + the SDK cannot otherwise tell which client produced this + reference. When omitted, the single configured client is used, + falling back to a default httpx client. + """ from langfuse._client.resource_manager import LangfuseResourceManager - httpx_client = LangfuseResourceManager.get_singleton_httpx_client() + httpx_client = client or LangfuseResourceManager.get_singleton_httpx_client() response = ( httpx_client.get(self.url, timeout=timeout) if httpx_client is not None @@ -62,13 +74,25 @@ def fetch_bytes(self, *, timeout: float = 30.0) -> bytes: return response.content - def fetch_base64(self, *, timeout: float = 30.0) -> str: - """Fetch media and return raw base64 without a data URI prefix.""" - return base64.b64encode(self.fetch_bytes(timeout=timeout)).decode() + def fetch_base64( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> str: + """Fetch media and return raw base64 without a data URI prefix. - def fetch_data_uri(self, *, timeout: float = 30.0) -> str: - """Fetch media and return it as a data URI.""" - return f"data:{self.content_type};base64,{self.fetch_base64(timeout=timeout)}" + See :meth:`fetch_bytes` for the ``client`` argument. + """ + return base64.b64encode( + self.fetch_bytes(timeout=timeout, client=client) + ).decode() + + def fetch_data_uri( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> str: + """Fetch media and return it as a data URI. + + See :meth:`fetch_bytes` for the ``client`` argument. + """ + return f"data:{self.content_type};base64,{self.fetch_base64(timeout=timeout, client=client)}" class LangfuseMedia: diff --git a/tests/unit/test_media.py b/tests/unit/test_media.py index 4f42d506b..d4b3c86cc 100644 --- a/tests/unit/test_media.py +++ b/tests/unit/test_media.py @@ -135,6 +135,78 @@ def test_media_reference_fetch_uses_configured_httpx_client(monkeypatch): httpx_get.assert_not_called() +def test_media_reference_fetch_uses_explicit_client(monkeypatch): + response = Mock() + response.content = b"explicit-bytes" + response.raise_for_status.return_value = None + explicit_client = Mock() + explicit_client.get.return_value = response + + singleton_client = Mock() + httpx_get = Mock() + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + {"pk-test": SimpleNamespace(httpx_client=singleton_client)}, + ) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + assert ( + reference.fetch_bytes(timeout=5.0, client=explicit_client) == b"explicit-bytes" + ) + explicit_client.get.assert_called_once_with( + "https://example.com/test.jpg", timeout=5.0 + ) + # Explicit client wins over the configured singleton and the default httpx. + singleton_client.get.assert_not_called() + httpx_get.assert_not_called() + + +def test_media_reference_fetch_falls_back_to_default_with_multiple_clients( + monkeypatch, caplog +): + import logging + + response = Mock() + response.content = b"default-bytes" + response.raise_for_status.return_value = None + httpx_get = Mock(return_value=response) + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + + client_a = Mock() + client_b = Mock() + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + { + "pk-a": SimpleNamespace(httpx_client=client_a), + "pk-b": SimpleNamespace(httpx_client=client_b), + }, + ) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + with caplog.at_level(logging.WARNING, logger="langfuse"): + assert reference.fetch_bytes(timeout=8.0) == b"default-bytes" + + # Ambiguous multi-client setup: warn and fall back to the default httpx + # instead of silently using an arbitrary instance's transport config. + assert "Multiple Langfuse clients" in caplog.text + httpx_get.assert_called_once_with("https://example.com/test.jpg", timeout=8.0) + client_a.get.assert_not_called() + client_b.get.assert_not_called() + + def test_resolve_media_references_uses_configured_httpx_client(): reference_string = "@@@langfuseMedia:type=image/jpeg|id=test-id|source=bytes@@@" fetch_timeout_seconds = 7 From 310f692ad50b4f87fb430237d2e901959814b2e7 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Fri, 19 Jun 2026 14:14:18 +0200 Subject: [PATCH 11/14] feat(datasets): scope media uploads to their dataset item Dataset item media uploads now send datasetId + datasetItemId + field instead of a trace context. create_dataset_item settles the item id up front (generating one when absent) so media can be linked before the item exists, threads the field and resolves the dataset id for each upload, and reuses the id for the create call. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 55 ++++++++++++++++--- langfuse/_task_manager/media_manager.py | 19 ++++++- langfuse/_task_manager/media_upload_queue.py | 2 + langfuse/api/media/client.py | 32 +++++++++-- langfuse/api/media/raw_client.py | 32 +++++++++-- .../types/get_media_upload_url_request.py | 20 ++++++- tests/unit/test_datasets.py | 41 +++++++++++--- 7 files changed, 166 insertions(+), 35 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 72dc5f0e6..a48e6e162 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -8,6 +8,7 @@ import os import re import urllib.parse +import uuid import warnings from datetime import datetime from hashlib import sha256 @@ -3367,15 +3368,32 @@ def create_dataset_item( try: langfuse_logger.debug(f"Creating dataset item for dataset {dataset_name}") + # Media uploads must reference the (dataset, item) they belong to, and + # the item need not exist yet — so settle on the item id up front and + # reuse it for the create call below. + item_id = id if id is not None else str(uuid.uuid4()) + uploaded_media_ids: set[str] = set() input = self._process_dataset_item_media( - data=input, uploaded_media_ids=uploaded_media_ids + data=input, + uploaded_media_ids=uploaded_media_ids, + dataset_name=dataset_name, + dataset_item_id=item_id, + field=DatasetItemMediaReferenceField.INPUT.value, ) expected_output = self._process_dataset_item_media( - data=expected_output, uploaded_media_ids=uploaded_media_ids + data=expected_output, + uploaded_media_ids=uploaded_media_ids, + dataset_name=dataset_name, + dataset_item_id=item_id, + field=DatasetItemMediaReferenceField.EXPECTED_OUTPUT.value, ) metadata = self._process_dataset_item_media( - data=metadata, uploaded_media_ids=uploaded_media_ids + data=metadata, + uploaded_media_ids=uploaded_media_ids, + dataset_name=dataset_name, + dataset_item_id=item_id, + field=DatasetItemMediaReferenceField.METADATA.value, ) result = self.api.dataset_items.create( @@ -3386,7 +3404,7 @@ def create_dataset_item( source_trace_id=source_trace_id, source_observation_id=source_observation_id, status=status, - id=id, + id=item_id, ) return cast(DatasetItem, result) @@ -3395,7 +3413,13 @@ def create_dataset_item( raise e def _process_dataset_item_media( - self, *, data: Any, uploaded_media_ids: set[str] + self, + *, + data: Any, + uploaded_media_ids: set[str], + dataset_name: str, + dataset_item_id: str, + field: str, ) -> Any: if self._resources is None: return data @@ -3409,7 +3433,11 @@ def _process_data_recursively( # under python -OO where parser docstrings may be stripped. if isinstance(data, LangfuseMedia): return self._upload_dataset_item_media( - media=data, uploaded_media_ids=uploaded_media_ids + media=data, + uploaded_media_ids=uploaded_media_ids, + dataset_name=dataset_name, + dataset_item_id=dataset_item_id, + field=field, ) if isinstance(data, LangfuseMediaReference): @@ -3447,7 +3475,13 @@ def _process_data_recursively( return _process_data_recursively(data, 1, set()) def _upload_dataset_item_media( - self, *, media: LangfuseMedia, uploaded_media_ids: set[str] + self, + *, + media: LangfuseMedia, + uploaded_media_ids: set[str], + dataset_name: str, + dataset_item_id: str, + field: str, ) -> str: reference_string = media._reference_string media_id = media._media_id @@ -3457,7 +3491,12 @@ def _upload_dataset_item_media( if media_id not in uploaded_media_ids: assert self._resources is not None - self._resources._media_manager._upload_media_sync(media=media) + self._resources._media_manager._upload_media_sync( + media=media, + dataset_id=self.api.datasets.get(dataset_name).id, + dataset_item_id=dataset_item_id, + field=field, + ) uploaded_media_ids.add(media_id) return reference_string diff --git a/langfuse/_task_manager/media_manager.py b/langfuse/_task_manager/media_manager.py index 18aaa6951..e1882fb4c 100644 --- a/langfuse/_task_manager/media_manager.py +++ b/langfuse/_task_manager/media_manager.py @@ -263,6 +263,8 @@ def _process_media( content_sha256_hash=media._content_sha256_hash, trace_id=trace_id, observation_id=observation_id, + dataset_id=None, + dataset_item_id=None, field=field, ) @@ -284,7 +286,14 @@ def _process_media( f"Media processing error: Failed to process media_id={media._media_id} for trace_id={trace_id}. Error: {str(e)}" ) - def _upload_media_sync(self, *, media: LangfuseMedia) -> None: + def _upload_media_sync( + self, + *, + media: LangfuseMedia, + dataset_id: Optional[str] = None, + dataset_item_id: Optional[str] = None, + field: Optional[str] = None, + ) -> None: if not self._enabled: raise ValueError("Cannot upload LangfuseMedia while media upload is disabled.") @@ -307,7 +316,9 @@ def _upload_media_sync(self, *, media: LangfuseMedia) -> None: content_sha256_hash=media._content_sha256_hash, trace_id=None, observation_id=None, - field=None, + dataset_id=dataset_id, + dataset_item_id=dataset_item_id, + field=field, ) self._process_upload_media_job(data=upload_media_job) @@ -322,9 +333,11 @@ def _process_upload_media_job( content_length=data["content_length"], content_type=cast(MediaContentType, data["content_type"]), sha256hash=data["content_sha256_hash"], - field=data["field"], trace_id=data["trace_id"], observation_id=data["observation_id"], + dataset_id=data["dataset_id"], + dataset_item_id=data["dataset_item_id"], + field=data["field"], ) upload_url = upload_url_response.upload_url diff --git a/langfuse/_task_manager/media_upload_queue.py b/langfuse/_task_manager/media_upload_queue.py index aac852105..a0ef4bdf1 100644 --- a/langfuse/_task_manager/media_upload_queue.py +++ b/langfuse/_task_manager/media_upload_queue.py @@ -9,4 +9,6 @@ class UploadMediaJob(TypedDict): content_sha256_hash: str trace_id: Optional[str] observation_id: Optional[str] + dataset_id: Optional[str] + dataset_item_id: Optional[str] field: Optional[str] diff --git a/langfuse/api/media/client.py b/langfuse/api/media/client.py index 648cc72f5..02dd986b7 100644 --- a/langfuse/api/media/client.py +++ b/langfuse/api/media/client.py @@ -143,6 +143,8 @@ def get_upload_url( sha256hash: str, trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + dataset_id: typing.Optional[str] = OMIT, + dataset_item_id: typing.Optional[str] = OMIT, field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> GetMediaUploadUrlResponse: @@ -160,13 +162,19 @@ def get_upload_url( The SHA-256 hash of the media record trace_id : typing.Optional[str] - The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. + The trace the media is associated with. Null for dataset item media uploads. observation_id : typing.Optional[str] - The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + + dataset_id : typing.Optional[str] + The dataset the media belongs to. Null for trace/observation media uploads. + + dataset_item_id : typing.Optional[str] + The dataset item the media is associated with (need not exist yet). Null for trace/observation media uploads. field : typing.Optional[str] - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. + The item field the media is in: `input`/`output`/`metadata` (trace) or `input`/`expectedOutput`/`metadata` (dataset item). request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -200,6 +208,8 @@ def get_upload_url( sha256hash=sha256hash, trace_id=trace_id, observation_id=observation_id, + dataset_id=dataset_id, + dataset_item_id=dataset_item_id, field=field, request_options=request_options, ) @@ -352,6 +362,8 @@ async def get_upload_url( sha256hash: str, trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + dataset_id: typing.Optional[str] = OMIT, + dataset_item_id: typing.Optional[str] = OMIT, field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> GetMediaUploadUrlResponse: @@ -369,13 +381,19 @@ async def get_upload_url( The SHA-256 hash of the media record trace_id : typing.Optional[str] - The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. + The trace the media is associated with. Null for dataset item media uploads. observation_id : typing.Optional[str] - The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + + dataset_id : typing.Optional[str] + The dataset the media belongs to. Null for trace/observation media uploads. + + dataset_item_id : typing.Optional[str] + The dataset item the media is associated with (need not exist yet). Null for trace/observation media uploads. field : typing.Optional[str] - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. + The item field the media is in: `input`/`output`/`metadata` (trace) or `input`/`expectedOutput`/`metadata` (dataset item). request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -417,6 +435,8 @@ async def main() -> None: sha256hash=sha256hash, trace_id=trace_id, observation_id=observation_id, + dataset_id=dataset_id, + dataset_item_id=dataset_item_id, field=field, request_options=request_options, ) diff --git a/langfuse/api/media/raw_client.py b/langfuse/api/media/raw_client.py index 9f4fa3d81..7bbdbe695 100644 --- a/langfuse/api/media/raw_client.py +++ b/langfuse/api/media/raw_client.py @@ -256,6 +256,8 @@ def get_upload_url( sha256hash: str, trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + dataset_id: typing.Optional[str] = OMIT, + dataset_item_id: typing.Optional[str] = OMIT, field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> HttpResponse[GetMediaUploadUrlResponse]: @@ -273,13 +275,19 @@ def get_upload_url( The SHA-256 hash of the media record trace_id : typing.Optional[str] - The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. + The trace the media is associated with. Null for dataset item media uploads. observation_id : typing.Optional[str] - The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + + dataset_id : typing.Optional[str] + The dataset the media belongs to. Null for trace/observation media uploads. + + dataset_item_id : typing.Optional[str] + The dataset item the media is associated with (need not exist yet). Null for trace/observation media uploads. field : typing.Optional[str] - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. + The item field the media is in: `input`/`output`/`metadata` (trace) or `input`/`expectedOutput`/`metadata` (dataset item). request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -294,6 +302,8 @@ def get_upload_url( json={ "traceId": trace_id, "observationId": observation_id, + "datasetId": dataset_id, + "datasetItemId": dataset_item_id, "contentType": content_type, "contentLength": content_length, "sha256Hash": sha256hash, @@ -614,6 +624,8 @@ async def get_upload_url( sha256hash: str, trace_id: typing.Optional[str] = OMIT, observation_id: typing.Optional[str] = OMIT, + dataset_id: typing.Optional[str] = OMIT, + dataset_item_id: typing.Optional[str] = OMIT, field: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> AsyncHttpResponse[GetMediaUploadUrlResponse]: @@ -631,13 +643,19 @@ async def get_upload_url( The SHA-256 hash of the media record trace_id : typing.Optional[str] - The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. + The trace the media is associated with. Null for dataset item media uploads. observation_id : typing.Optional[str] - The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + + dataset_id : typing.Optional[str] + The dataset the media belongs to. Null for trace/observation media uploads. + + dataset_item_id : typing.Optional[str] + The dataset item the media is associated with (need not exist yet). Null for trace/observation media uploads. field : typing.Optional[str] - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. + The item field the media is in: `input`/`output`/`metadata` (trace) or `input`/`expectedOutput`/`metadata` (dataset item). request_options : typing.Optional[RequestOptions] Request-specific configuration. @@ -652,6 +670,8 @@ async def get_upload_url( json={ "traceId": trace_id, "observationId": observation_id, + "datasetId": dataset_id, + "datasetItemId": dataset_item_id, "contentType": content_type, "contentLength": content_length, "sha256Hash": sha256hash, diff --git a/langfuse/api/media/types/get_media_upload_url_request.py b/langfuse/api/media/types/get_media_upload_url_request.py index 7222fbdba..702d7f534 100644 --- a/langfuse/api/media/types/get_media_upload_url_request.py +++ b/langfuse/api/media/types/get_media_upload_url_request.py @@ -14,14 +14,28 @@ class GetMediaUploadUrlRequest(UniversalBaseModel): typing.Optional[str], FieldMetadata(alias="traceId") ] = pydantic.Field(default=None) """ - The trace ID associated with the media record. If null, the media record is not associated with a trace, e.g. when uploading media for dataset items. + The trace the media is associated with. Null for dataset item media uploads. """ observation_id: typing_extensions.Annotated[ typing.Optional[str], FieldMetadata(alias="observationId") ] = pydantic.Field(default=None) """ - The observation ID associated with the media record. If provided, traceId must be provided as well. If the media record is associated directly with a trace, this will be null. + The observation ID associated with the media record. If the media record is associated directly with a trace, this will be null. + """ + + dataset_id: typing_extensions.Annotated[ + typing.Optional[str], FieldMetadata(alias="datasetId") + ] = pydantic.Field(default=None) + """ + The dataset the media belongs to. Null for trace/observation media uploads. + """ + + dataset_item_id: typing_extensions.Annotated[ + typing.Optional[str], FieldMetadata(alias="datasetItemId") + ] = pydantic.Field(default=None) + """ + The dataset item the media is associated with (need not exist yet). Null for trace/observation media uploads. """ content_type: typing_extensions.Annotated[ @@ -43,7 +57,7 @@ class GetMediaUploadUrlRequest(UniversalBaseModel): field: typing.Optional[str] = pydantic.Field(default=None) """ - The trace / observation field the media record is associated with. This can be one of `input`, `output`, `metadata`. Required if traceId is provided, ignored otherwise. + The item field the media is in: `input`/`output`/`metadata` (trace) or `input`/`expectedOutput`/`metadata` (dataset item). """ model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict( diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index 78fb87094..098611b9f 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -101,10 +101,12 @@ def test_create_dataset_item_processes_media_before_api_call(): media_manager = Mock() dataset_items_api = Mock() dataset_items_api.create.return_value = "created-item" + datasets_api = Mock() + datasets_api.get.return_value = SimpleNamespace(id="dataset-id") client = object.__new__(Langfuse) client._resources = SimpleNamespace(_media_manager=media_manager) - client.api = SimpleNamespace(dataset_items=dataset_items_api) + client.api = SimpleNamespace(dataset_items=dataset_items_api, datasets=datasets_api) input_data = {"image": media} metadata = {"items": [media], "keep": "value"} @@ -113,13 +115,23 @@ def test_create_dataset_item_processes_media_before_api_call(): input=input_data, expected_output=root_media, metadata=metadata, + id="item-id", ) assert result == "created-item" assert input_data == {"image": media} assert metadata == {"items": [media], "keep": "value"} - media_manager._upload_media_sync.assert_any_call(media=media) - media_manager._upload_media_sync.assert_any_call(media=root_media) + # Each upload carries the dataset id (resolved from the name) plus the item + # id and the field the media lives in. + media_manager._upload_media_sync.assert_any_call( + media=media, dataset_id="dataset-id", dataset_item_id="item-id", field="input" + ) + media_manager._upload_media_sync.assert_any_call( + media=root_media, + dataset_id="dataset-id", + dataset_item_id="item-id", + field="expectedOutput", + ) assert media_manager._upload_media_sync.call_count == 2 dataset_items_api.create.assert_called_once_with( dataset_name="dataset", @@ -129,7 +141,7 @@ def test_create_dataset_item_processes_media_before_api_call(): source_trace_id=None, source_observation_id=None, status=None, - id=None, + id="item-id", ) @@ -190,18 +202,23 @@ def test_create_dataset_item_processes_shared_media_subtrees(): media_manager = Mock() dataset_items_api = Mock() dataset_items_api.create.return_value = "created-item" + datasets_api = Mock() + datasets_api.get.return_value = SimpleNamespace(id="dataset-id") client = object.__new__(Langfuse) client._resources = SimpleNamespace(_media_manager=media_manager) - client.api = SimpleNamespace(dataset_items=dataset_items_api) + client.api = SimpleNamespace(dataset_items=dataset_items_api, datasets=datasets_api) client.create_dataset_item( dataset_name="dataset", input={"a": shared, "b": shared}, + id="item-id", ) assert shared == {"image": media} - media_manager._upload_media_sync.assert_called_once_with(media=media) + media_manager._upload_media_sync.assert_called_once_with( + media=media, dataset_id="dataset-id", dataset_item_id="item-id", field="input" + ) assert dataset_items_api.create.call_args.kwargs["input"] == { "a": {"image": media._reference_string}, "b": {"image": media._reference_string}, @@ -214,14 +231,20 @@ def test_create_dataset_item_processes_media_in_sets(): media_manager = Mock() dataset_items_api = Mock() dataset_items_api.create.return_value = "created-item" + datasets_api = Mock() + datasets_api.get.return_value = SimpleNamespace(id="dataset-id") client = object.__new__(Langfuse) client._resources = SimpleNamespace(_media_manager=media_manager) - client.api = SimpleNamespace(dataset_items=dataset_items_api) + client.api = SimpleNamespace(dataset_items=dataset_items_api, datasets=datasets_api) - client.create_dataset_item(dataset_name="dataset", input={"images": {media}}) + client.create_dataset_item( + dataset_name="dataset", input={"images": {media}}, id="item-id" + ) - media_manager._upload_media_sync.assert_called_once_with(media=media) + media_manager._upload_media_sync.assert_called_once_with( + media=media, dataset_id="dataset-id", dataset_item_id="item-id", field="input" + ) assert dataset_items_api.create.call_args.kwargs["input"] == { "images": {media._reference_string} } From 6c0d12c62990942b71396616d1b25867d5736d1d Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Fri, 19 Jun 2026 14:57:38 +0200 Subject: [PATCH 12/14] fix(datasets): resolve dataset id once per create_dataset_item Resolving the dataset id inside the per-media upload path fired one datasets.get per distinct media and could orphan already-uploaded media if a later lookup failed. Resolve it once up front, before any upload, and thread dataset_id through the media processing. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 18 ++++++++++-------- tests/unit/test_datasets.py | 7 ++++++- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index a48e6e162..bcc032a40 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3370,28 +3370,30 @@ def create_dataset_item( # Media uploads must reference the (dataset, item) they belong to, and # the item need not exist yet — so settle on the item id up front and - # reuse it for the create call below. + # reuse it for the create call below. The dataset id is invariant for + # the call, so resolve it once here rather than per uploaded media. item_id = id if id is not None else str(uuid.uuid4()) + dataset_id = self.api.datasets.get(dataset_name).id uploaded_media_ids: set[str] = set() input = self._process_dataset_item_media( data=input, uploaded_media_ids=uploaded_media_ids, - dataset_name=dataset_name, + dataset_id=dataset_id, dataset_item_id=item_id, field=DatasetItemMediaReferenceField.INPUT.value, ) expected_output = self._process_dataset_item_media( data=expected_output, uploaded_media_ids=uploaded_media_ids, - dataset_name=dataset_name, + dataset_id=dataset_id, dataset_item_id=item_id, field=DatasetItemMediaReferenceField.EXPECTED_OUTPUT.value, ) metadata = self._process_dataset_item_media( data=metadata, uploaded_media_ids=uploaded_media_ids, - dataset_name=dataset_name, + dataset_id=dataset_id, dataset_item_id=item_id, field=DatasetItemMediaReferenceField.METADATA.value, ) @@ -3417,7 +3419,7 @@ def _process_dataset_item_media( *, data: Any, uploaded_media_ids: set[str], - dataset_name: str, + dataset_id: str, dataset_item_id: str, field: str, ) -> Any: @@ -3435,7 +3437,7 @@ def _process_data_recursively( return self._upload_dataset_item_media( media=data, uploaded_media_ids=uploaded_media_ids, - dataset_name=dataset_name, + dataset_id=dataset_id, dataset_item_id=dataset_item_id, field=field, ) @@ -3479,7 +3481,7 @@ def _upload_dataset_item_media( *, media: LangfuseMedia, uploaded_media_ids: set[str], - dataset_name: str, + dataset_id: str, dataset_item_id: str, field: str, ) -> str: @@ -3493,7 +3495,7 @@ def _upload_dataset_item_media( assert self._resources is not None self._resources._media_manager._upload_media_sync( media=media, - dataset_id=self.api.datasets.get(dataset_name).id, + dataset_id=dataset_id, dataset_item_id=dataset_item_id, field=field, ) diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index 098611b9f..342b8d8fa 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -133,6 +133,9 @@ def test_create_dataset_item_processes_media_before_api_call(): field="expectedOutput", ) assert media_manager._upload_media_sync.call_count == 2 + # The dataset id is invariant for the call, so it is resolved exactly once + # regardless of how many distinct media the item carries. + datasets_api.get.assert_called_once_with("dataset") dataset_items_api.create.assert_called_once_with( dataset_name="dataset", input={"image": media._reference_string}, @@ -184,8 +187,10 @@ def test_create_dataset_item_roundtrips_resolved_media_reference(): media_manager = Mock() dataset_items_api = Mock() dataset_items_api.create.return_value = "created-item" + datasets_api = Mock() + datasets_api.get.return_value = SimpleNamespace(id="dataset-id") client._resources = SimpleNamespace(_media_manager=media_manager) - client.api = SimpleNamespace(dataset_items=dataset_items_api) + client.api = SimpleNamespace(dataset_items=dataset_items_api, datasets=datasets_api) client.create_dataset_item(dataset_name="dataset", input=hydrated.input) From d4e08f29a4a5f2a7db795105bb338159601340aa Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Fri, 19 Jun 2026 15:21:36 +0200 Subject: [PATCH 13/14] fix(datasets): url-encode dataset name and fix media upload job fixture URL-encode the dataset name in the create_dataset_item media lookup to match the other datasets.* call sites, and add the new dataset_id / dataset_item_id keys to the _upload_job test fixture so _process_upload_media_job no longer KeyErrors. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 2 +- tests/unit/test_media_manager.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index bcc032a40..c7d9acb3f 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -3373,7 +3373,7 @@ def create_dataset_item( # reuse it for the create call below. The dataset id is invariant for # the call, so resolve it once here rather than per uploaded media. item_id = id if id is not None else str(uuid.uuid4()) - dataset_id = self.api.datasets.get(dataset_name).id + dataset_id = self.api.datasets.get(self._url_encode(dataset_name)).id uploaded_media_ids: set[str] = set() input = self._process_dataset_item_media( diff --git a/tests/unit/test_media_manager.py b/tests/unit/test_media_manager.py index 3cf152106..3ab4e3226 100644 --- a/tests/unit/test_media_manager.py +++ b/tests/unit/test_media_manager.py @@ -23,6 +23,8 @@ def _upload_job() -> dict: "content_sha256_hash": "sha256hash", "trace_id": "trace-id", "observation_id": None, + "dataset_id": None, + "dataset_item_id": None, "field": "input", } From e40e8eaf9183d6d1f151c80be8dd22be3f2933e5 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Fri, 19 Jun 2026 15:32:27 +0200 Subject: [PATCH 14/14] refactor(datasets): collect media in one pass, resolve dataset id lazily Replace each LangfuseMedia with its reference string and collect the media to upload in a single traversal per field, then resolve the dataset id once and upload the collected set. A plain item no longer does any datasets.get; media items do exactly one, before any upload. Co-Authored-By: Claude Opus 4.8 (1M context) --- langfuse/_client/client.py | 92 ++++++++++++++++--------------------- tests/unit/test_datasets.py | 22 +++++++++ 2 files changed, 62 insertions(+), 52 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index c7d9acb3f..f1dbb69f3 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -20,6 +20,7 @@ List, Literal, Optional, + Tuple, Type, Union, cast, @@ -3370,34 +3371,43 @@ def create_dataset_item( # Media uploads must reference the (dataset, item) they belong to, and # the item need not exist yet — so settle on the item id up front and - # reuse it for the create call below. The dataset id is invariant for - # the call, so resolve it once here rather than per uploaded media. + # reuse it for the create call below. item_id = id if id is not None else str(uuid.uuid4()) - dataset_id = self.api.datasets.get(self._url_encode(dataset_name)).id - uploaded_media_ids: set[str] = set() + # Single pass per field: swap each LangfuseMedia for its reference + # string (derived from content, not the upload) and collect the media + # still to upload, deduped by media id and tagged with its field. + pending_media: Dict[str, Tuple[LangfuseMedia, str]] = {} input = self._process_dataset_item_media( data=input, - uploaded_media_ids=uploaded_media_ids, - dataset_id=dataset_id, - dataset_item_id=item_id, + pending_media=pending_media, field=DatasetItemMediaReferenceField.INPUT.value, ) expected_output = self._process_dataset_item_media( data=expected_output, - uploaded_media_ids=uploaded_media_ids, - dataset_id=dataset_id, - dataset_item_id=item_id, + pending_media=pending_media, field=DatasetItemMediaReferenceField.EXPECTED_OUTPUT.value, ) metadata = self._process_dataset_item_media( data=metadata, - uploaded_media_ids=uploaded_media_ids, - dataset_id=dataset_id, - dataset_item_id=item_id, + pending_media=pending_media, field=DatasetItemMediaReferenceField.METADATA.value, ) + # The upload needs the dataset id, but the create API only takes the + # name. Resolve it once, and only when there is actually media to + # upload — a plain item pays no extra datasets.get round-trip. + if pending_media: + assert self._resources is not None + dataset_id = self.api.datasets.get(self._url_encode(dataset_name)).id + for media, field in pending_media.values(): + self._resources._media_manager._upload_media_sync( + media=media, + dataset_id=dataset_id, + dataset_item_id=item_id, + field=field, + ) + result = self.api.dataset_items.create( dataset_name=dataset_name, input=input, @@ -3418,11 +3428,15 @@ def _process_dataset_item_media( self, *, data: Any, - uploaded_media_ids: set[str], - dataset_id: str, - dataset_item_id: str, + pending_media: Dict[str, Tuple[LangfuseMedia, str]], field: str, ) -> Any: + """Swap each ``LangfuseMedia`` for its reference string in ``data``. + + Each replaced media is recorded in ``pending_media`` (keyed by media id, + so the same media across fields uploads once) for the caller to upload + after the dataset id has been resolved. + """ if self._resources is None: return data @@ -3434,13 +3448,15 @@ def _process_data_recursively( # Avoid jsonpath-ng here: dataset writes should keep working # under python -OO where parser docstrings may be stripped. if isinstance(data, LangfuseMedia): - return self._upload_dataset_item_media( - media=data, - uploaded_media_ids=uploaded_media_ids, - dataset_id=dataset_id, - dataset_item_id=dataset_item_id, - field=field, - ) + reference_string = data._reference_string + media_id = data._media_id + if reference_string is None or media_id is None: + raise ValueError( + "Cannot create dataset item with invalid LangfuseMedia." + ) + # First field a media appears in wins; later duplicates dedupe. + pending_media.setdefault(media_id, (data, field)) + return reference_string if isinstance(data, LangfuseMediaReference): return data.reference_string if data.reference_string else data @@ -3450,8 +3466,7 @@ def _process_data_recursively( if not isinstance(data, (list, set, frozenset, dict)): return data - # Container ids only protect against recursive cycles; media upload - # dedupe is handled by uploaded_media_ids. + # Container ids only protect against recursive cycles. data_id = id(data) if data_id in ancestor_container_ids or level > max_levels: return data @@ -3476,33 +3491,6 @@ def _process_data_recursively( return _process_data_recursively(data, 1, set()) - def _upload_dataset_item_media( - self, - *, - media: LangfuseMedia, - uploaded_media_ids: set[str], - dataset_id: str, - dataset_item_id: str, - field: str, - ) -> str: - reference_string = media._reference_string - media_id = media._media_id - - if reference_string is None or media_id is None: - raise ValueError("Cannot create dataset item with invalid LangfuseMedia.") - - if media_id not in uploaded_media_ids: - assert self._resources is not None - self._resources._media_manager._upload_media_sync( - media=media, - dataset_id=dataset_id, - dataset_item_id=dataset_item_id, - field=field, - ) - uploaded_media_ids.add(media_id) - - return reference_string - def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetItem: media_references = item.media_references or [] if not media_references: diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py index 342b8d8fa..d27ed6308 100644 --- a/tests/unit/test_datasets.py +++ b/tests/unit/test_datasets.py @@ -253,3 +253,25 @@ def test_create_dataset_item_processes_media_in_sets(): assert dataset_items_api.create.call_args.kwargs["input"] == { "images": {media._reference_string} } + + +def test_create_dataset_item_skips_dataset_lookup_without_media(): + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + datasets_api = Mock() + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api, datasets=datasets_api) + + client.create_dataset_item( + dataset_name="dataset", + input={"question": "no media here"}, + expected_output="plain text", + metadata={"k": "v"}, + ) + + # No media to upload, so the dataset id is never looked up. + datasets_api.get.assert_not_called() + media_manager._upload_media_sync.assert_not_called()