diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 943eee0c12..bcb3ae60e3 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -604,7 +604,7 @@ def _upload_local_data(self, local_table: local_data.ManagedArrowTable): # Might be better as a queue and a worker thread with self._upload_lock: if local_table not in self.cache._uploaded_local_data: - uploaded = self.loader.load_data( + uploaded = self.loader.load_data_or_write_data( local_table, bigframes.core.guid.generate_guid() ) self.cache.cache_remote_replacement(local_table, uploaded) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 9d222a3755..0944c0dab6 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -350,16 +350,38 @@ def read_managed_data( session=self._session, ) + def load_data_or_write_data( + self, + data: local_data.ManagedArrowTable, + offsets_col: str, + ) -> bq_data.BigqueryDataSource: + """Write local data into BigQuery using the local API if possible, + otherwise use the write API.""" + can_load = all( + _is_dtype_can_load(item.column, item.dtype) for item in data.schema.items + ) + if can_load: + return self.load_data(data, offsets_col=offsets_col) + else: + return self.write_data(data, offsets_col=offsets_col) + def load_data( self, data: local_data.ManagedArrowTable, offsets_col: str, ) -> bq_data.BigqueryDataSource: """Load managed data into bigquery""" - - # JSON support incomplete - for item in data.schema.items: - _validate_dtype_can_load(item.column, item.dtype) + cannot_load_columns = { + item.column: item.dtype + for item in data.schema.items + if not _is_dtype_can_load(item.column, item.dtype) + } + + if cannot_load_columns: + raise NotImplementedError( + f"Nested JSON types are currently unsupported for BigQuery Load API. " + f"Unsupported columns: {cannot_load_columns}. {constants.FEEDBACK_LINK}" + ) schema_w_offsets = data.schema.append( schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE) @@ -1474,7 +1496,7 @@ def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict: return configuration -def _validate_dtype_can_load(name: str, column_type: bigframes.dtypes.Dtype): +def _is_dtype_can_load(name: str, column_type: bigframes.dtypes.Dtype) -> bool: """ Determines whether a datatype is supported by bq load jobs. @@ -1482,23 +1504,19 @@ def _validate_dtype_can_load(name: str, column_type: bigframes.dtypes.Dtype): we're using a workaround: storing JSON as strings and then parsing them into JSON objects. TODO(b/395912450): Remove workaround solution once b/374784249 got resolved. - - Raises: - NotImplementedError: Type is not yet supported by load jobs. """ # we can handle top-level json, but not nested yet through string conversion if column_type == bigframes.dtypes.JSON_DTYPE: - return + return True if isinstance( column_type, pandas.ArrowDtype ) and bigframes.dtypes.contains_db_dtypes_json_arrow_type( column_type.pyarrow_dtype ): - raise NotImplementedError( - f"Nested JSON types, found in column `{name}`: `{column_type}`', " - f"are currently unsupported for upload. {constants.FEEDBACK_LINK}" - ) + return False + + return True # itertools.batched not available in python <3.12, so we use this instead diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 922f73a0ce..2fa633a62b 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -1092,7 +1092,9 @@ def test_read_pandas_w_nested_json_fails(session, write_engine): pa.list_(pa.struct([("json_field", bigframes.dtypes.JSON_ARROW_TYPE)])) ), ) - with pytest.raises(NotImplementedError, match="Nested JSON types, found in column"): + with pytest.raises( + NotImplementedError, match="Nested JSON types are currently unsupported" + ): session.read_pandas(pd_s, write_engine=write_engine) @@ -1178,7 +1180,9 @@ def test_read_pandas_w_nested_json_index_fails(session, write_engine): pa.list_(pa.struct([("json_field", bigframes.dtypes.JSON_ARROW_TYPE)])) ), ) - with pytest.raises(NotImplementedError, match="Nested JSON types, found in"): + with pytest.raises( + NotImplementedError, match="Nested JSON types are currently unsupported" + ): session.read_pandas(pd_idx, write_engine=write_engine)