Skip to content

Commit ffa5202

Browse files
committed
fix: upload local data through write API if nested JSONs detected
1 parent fa97675 commit ffa5202

File tree

3 files changed

+33
-15
lines changed

3 files changed

+33
-15
lines changed

bigframes/session/bq_caching_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ def _upload_local_data(self, local_table: local_data.ManagedArrowTable):
604604
# Might be better as a queue and a worker thread
605605
with self._upload_lock:
606606
if local_table not in self.cache._uploaded_local_data:
607-
uploaded = self.loader.load_data(
607+
uploaded = self.loader.load_data_or_write_data(
608608
local_table, bigframes.core.guid.generate_guid()
609609
)
610610
self.cache.cache_remote_replacement(local_table, uploaded)

bigframes/session/loader.py

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -350,16 +350,38 @@ def read_managed_data(
350350
session=self._session,
351351
)
352352

353+
def load_data_or_write_data(
354+
self,
355+
data: local_data.ManagedArrowTable,
356+
offsets_col: str,
357+
) -> bq_data.BigqueryDataSource:
358+
"""Write local data into BigQuery using the local API if possible,
359+
otherwise use the write API."""
360+
can_load = all(
361+
_is_dtype_can_load(item.column, item.dtype) for item in data.schema.items
362+
)
363+
if can_load:
364+
return self.load_data(data, offsets_col=offsets_col)
365+
else:
366+
return self.write_data(data, offsets_col=offsets_col)
367+
353368
def load_data(
354369
self,
355370
data: local_data.ManagedArrowTable,
356371
offsets_col: str,
357372
) -> bq_data.BigqueryDataSource:
358373
"""Load managed data into bigquery"""
359-
360-
# JSON support incomplete
361-
for item in data.schema.items:
362-
_validate_dtype_can_load(item.column, item.dtype)
374+
cannot_load_columns = {
375+
item.column: item.dtype
376+
for item in data.schema.items
377+
if not _is_dtype_can_load(item.column, item.dtype)
378+
}
379+
380+
if cannot_load_columns:
381+
raise NotImplementedError(
382+
f"Nested JSON types are currently unsupported for BigQuery Load API. "
383+
f"Unsupported columns: {cannot_load_columns}. {constants.FEEDBACK_LINK}"
384+
)
363385

364386
schema_w_offsets = data.schema.append(
365387
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
@@ -1474,31 +1496,27 @@ def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict:
14741496
return configuration
14751497

14761498

1477-
def _validate_dtype_can_load(name: str, column_type: bigframes.dtypes.Dtype):
1499+
def _is_dtype_can_load(name: str, column_type: bigframes.dtypes.Dtype) -> bool:
14781500
"""
14791501
Determines whether a datatype is supported by bq load jobs.
14801502
14811503
Due to a BigQuery IO limitation with loading JSON from Parquet files (b/374784249),
14821504
we're using a workaround: storing JSON as strings and then parsing them into JSON
14831505
objects.
14841506
TODO(b/395912450): Remove workaround solution once b/374784249 got resolved.
1485-
1486-
Raises:
1487-
NotImplementedError: Type is not yet supported by load jobs.
14881507
"""
14891508
# we can handle top-level json, but not nested yet through string conversion
14901509
if column_type == bigframes.dtypes.JSON_DTYPE:
1491-
return
1510+
return True
14921511

14931512
if isinstance(
14941513
column_type, pandas.ArrowDtype
14951514
) and bigframes.dtypes.contains_db_dtypes_json_arrow_type(
14961515
column_type.pyarrow_dtype
14971516
):
1498-
raise NotImplementedError(
1499-
f"Nested JSON types, found in column `{name}`: `{column_type}`', "
1500-
f"are currently unsupported for upload. {constants.FEEDBACK_LINK}"
1501-
)
1517+
return False
1518+
1519+
return True
15021520

15031521

15041522
# itertools.batched not available in python <3.12, so we use this instead

tests/system/small/test_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,7 @@ def test_read_pandas_w_nested_json_fails(session, write_engine):
10921092
pa.list_(pa.struct([("json_field", bigframes.dtypes.JSON_ARROW_TYPE)]))
10931093
),
10941094
)
1095-
with pytest.raises(NotImplementedError, match="Nested JSON types, found in column"):
1095+
with pytest.raises(NotImplementedError, match="Nested JSON types are currently unsupported"):
10961096
session.read_pandas(pd_s, write_engine=write_engine)
10971097

10981098

0 commit comments

Comments
 (0)