Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ def _upload_local_data(self, local_table: local_data.ManagedArrowTable):
# Might be better as a queue and a worker thread
with self._upload_lock:
if local_table not in self.cache._uploaded_local_data:
uploaded = self.loader.load_data(
uploaded = self.loader.load_data_or_write_data(
local_table, bigframes.core.guid.generate_guid()
)
self.cache.cache_remote_replacement(local_table, uploaded)
Expand Down
44 changes: 31 additions & 13 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,16 +350,38 @@ def read_managed_data(
session=self._session,
)

def load_data_or_write_data(
self,
data: local_data.ManagedArrowTable,
offsets_col: str,
) -> bq_data.BigqueryDataSource:
"""Write local data into BigQuery using the local API if possible,
otherwise use the write API."""
can_load = all(
_is_dtype_can_load(item.column, item.dtype) for item in data.schema.items
)
if can_load:
return self.load_data(data, offsets_col=offsets_col)
else:
return self.write_data(data, offsets_col=offsets_col)

def load_data(
self,
data: local_data.ManagedArrowTable,
offsets_col: str,
) -> bq_data.BigqueryDataSource:
"""Load managed data into bigquery"""

# JSON support incomplete
for item in data.schema.items:
_validate_dtype_can_load(item.column, item.dtype)
cannot_load_columns = {
item.column: item.dtype
for item in data.schema.items
if not _is_dtype_can_load(item.column, item.dtype)
}

if cannot_load_columns:
raise NotImplementedError(
f"Nested JSON types are currently unsupported for BigQuery Load API. "
f"Unsupported columns: {cannot_load_columns}. {constants.FEEDBACK_LINK}"
)

schema_w_offsets = data.schema.append(
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
Expand Down Expand Up @@ -1474,31 +1496,27 @@ def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict:
return configuration


def _validate_dtype_can_load(name: str, column_type: bigframes.dtypes.Dtype):
def _is_dtype_can_load(name: str, column_type: bigframes.dtypes.Dtype) -> bool:
"""
Determines whether a datatype is supported by bq load jobs.

Due to a BigQuery IO limitation with loading JSON from Parquet files (b/374784249),
we're using a workaround: storing JSON as strings and then parsing them into JSON
objects.
TODO(b/395912450): Remove workaround solution once b/374784249 got resolved.

Raises:
NotImplementedError: Type is not yet supported by load jobs.
"""
# we can handle top-level json, but not nested yet through string conversion
if column_type == bigframes.dtypes.JSON_DTYPE:
return
return True

if isinstance(
column_type, pandas.ArrowDtype
) and bigframes.dtypes.contains_db_dtypes_json_arrow_type(
column_type.pyarrow_dtype
):
raise NotImplementedError(
f"Nested JSON types, found in column `{name}`: `{column_type}`', "
f"are currently unsupported for upload. {constants.FEEDBACK_LINK}"
)
return False

return True


# itertools.batched not available in python <3.12, so we use this instead
Expand Down
8 changes: 6 additions & 2 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,9 @@ def test_read_pandas_w_nested_json_fails(session, write_engine):
pa.list_(pa.struct([("json_field", bigframes.dtypes.JSON_ARROW_TYPE)]))
),
)
with pytest.raises(NotImplementedError, match="Nested JSON types, found in column"):
with pytest.raises(
NotImplementedError, match="Nested JSON types are currently unsupported"
):
session.read_pandas(pd_s, write_engine=write_engine)


Expand Down Expand Up @@ -1178,7 +1180,9 @@ def test_read_pandas_w_nested_json_index_fails(session, write_engine):
pa.list_(pa.struct([("json_field", bigframes.dtypes.JSON_ARROW_TYPE)]))
),
)
with pytest.raises(NotImplementedError, match="Nested JSON types, found in"):
with pytest.raises(
NotImplementedError, match="Nested JSON types are currently unsupported"
):
session.read_pandas(pd_idx, write_engine=write_engine)


Expand Down
Loading