Skip to content

Commit b3c468e

Browse files
committed
Table commit retries based on table properties
1 parent c311dac commit b3c468e

10 files changed

Lines changed: 293 additions & 7 deletions

File tree

pyiceberg/catalog/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from pyiceberg.table import (
4646
CommitTableRequest,
4747
CommitTableResponse,
48+
CommitTableRetryableExceptions,
4849
Table,
4950
)
5051
from pyiceberg.table.metadata import TableMetadata
@@ -417,6 +418,14 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
417418
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
418419
"""
419420

421+
@abstractmethod
422+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
423+
"""Return list of retryable table commit exception for the catalog.
424+
425+
Returns:
426+
CommitTableRetryableExceptions: Collection of commit exceptions to handle.
427+
"""
428+
420429
@abstractmethod
421430
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
422431
"""Create a namespace in the catalog.

pyiceberg/catalog/dynamodb.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
5252
from pyiceberg.schema import Schema
5353
from pyiceberg.serializers import FromInputFile
54-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
54+
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table
5555
from pyiceberg.table.metadata import new_table_metadata
5656
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
5757
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -195,6 +195,14 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
195195
"""
196196
raise NotImplementedError
197197

198+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
199+
"""Return commit exceptions that can be retried by the table.
200+
201+
Returns:
202+
CommitTableRetryableExceptions: The retryable exceptions.
203+
"""
204+
return CommitTableRetryableExceptions((GenericDynamoDbError,), ())
205+
198206
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
199207
"""Update the table.
200208

pyiceberg/catalog/glue.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6363
from pyiceberg.schema import Schema, SchemaVisitor, visit
6464
from pyiceberg.serializers import FromInputFile
65-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
65+
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table, update_table_metadata
6666
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
6767
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6868
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -417,6 +417,14 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
417417
"""
418418
raise NotImplementedError
419419

420+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
421+
"""Return commit exceptions that can be retried by the table.
422+
423+
Returns:
424+
CommitTableRetryableExceptions: The retryable exceptions.
425+
"""
426+
return CommitTableRetryableExceptions((NoSuchTableError,), (CommitFailedException,))
427+
420428
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
421429
"""Update the table.
422430

pyiceberg/catalog/hive.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,14 @@
7474
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
7575
from pyiceberg.schema import Schema, SchemaVisitor, visit
7676
from pyiceberg.serializers import FromInputFile
77-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata
77+
from pyiceberg.table import (
78+
CommitTableRequest,
79+
CommitTableResponse,
80+
CommitTableRetryableExceptions,
81+
Table,
82+
TableProperties,
83+
update_table_metadata,
84+
)
7885
from pyiceberg.table.metadata import new_table_metadata
7986
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
8087
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -351,6 +358,9 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque
351358

352359
return lock_request
353360

361+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
362+
return CommitTableRetryableExceptions((NoSuchTableError, NoSuchIcebergTableError), ())
363+
354364
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
355365
"""Update the table.
356366

pyiceberg/catalog/noop.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from pyiceberg.table import (
2929
CommitTableRequest,
3030
CommitTableResponse,
31+
CommitTableRetryableExceptions,
3132
Table,
3233
)
3334
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -73,6 +74,9 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
7374
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
7475
raise NotImplementedError
7576

77+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
78+
return CommitTableRetryableExceptions((), ())
79+
7680
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
7781
raise NotImplementedError
7882

pyiceberg/catalog/rest.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
from pyiceberg.table import (
6262
CommitTableRequest,
6363
CommitTableResponse,
64+
CommitTableRetryableExceptions,
6465
Table,
6566
TableIdentifier,
6667
)
@@ -616,6 +617,9 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
616617

617618
return self.load_table(to_identifier)
618619

620+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
621+
return CommitTableRetryableExceptions((CommitStateUnknownException,), (CommitFailedException,))
622+
619623
@retry(**_RETRY_ARGS)
620624
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
621625
"""Update the table.

pyiceberg/catalog/sql.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
)
4949
from pyiceberg.exceptions import (
5050
CommitFailedException,
51+
CommitStateUnknownException,
5152
NamespaceAlreadyExistsError,
5253
NamespaceNotEmptyError,
5354
NoSuchNamespaceError,
@@ -59,7 +60,7 @@
5960
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6061
from pyiceberg.schema import Schema
6162
from pyiceberg.serializers import FromInputFile
62-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
63+
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table, update_table_metadata
6364
from pyiceberg.table.metadata import new_table_metadata
6465
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6566
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -360,6 +361,9 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
360361
raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e
361362
return self.load_table(to_identifier)
362363

364+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
365+
return CommitTableRetryableExceptions((CommitStateUnknownException, NoSuchTableError), (CommitFailedException,))
366+
363367
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
364368
"""Update one or more tables.
365369

pyiceberg/table/__init__.py

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from copy import copy
2525
from dataclasses import dataclass
2626
from enum import Enum
27-
from functools import cached_property, singledispatch
27+
from functools import cached_property, partial, singledispatch
2828
from itertools import chain
2929
from typing import (
3030
TYPE_CHECKING,
@@ -39,17 +39,30 @@
3939
Optional,
4040
Set,
4141
Tuple,
42+
Type,
4243
TypeVar,
4344
Union,
4445
)
4546

4647
from pydantic import Field, field_validator
4748
from sortedcontainers import SortedList
49+
from tenacity import (
50+
RetryError,
51+
Retrying,
52+
retry_if_exception_type,
53+
stop_after_attempt,
54+
stop_after_delay,
55+
wait_exponential,
56+
)
4857
from typing_extensions import Annotated
4958

5059
import pyiceberg.expressions.parser as parser
5160
import pyiceberg.expressions.visitors as visitors
52-
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
61+
from pyiceberg.exceptions import (
62+
CommitFailedException,
63+
ResolveError,
64+
ValidationError,
65+
)
5366
from pyiceberg.expressions import (
5467
AlwaysTrue,
5568
And,
@@ -947,6 +960,97 @@ class CommitTableResponse(IcebergBaseModel):
947960
metadata_location: str = Field(alias="metadata-location")
948961

949962

963+
class CommitTableRetryableExceptions:
964+
"""A catalogs commit exceptions that are retryable."""
965+
966+
def __init__(self, retry_exceptions: tuple[Type[Exception], ...], retry_refresh_exceptions: tuple[Type[Exception], ...]):
967+
self.retry_exceptions: tuple[Type[Exception], ...] = retry_exceptions
968+
self.retry_refresh_exceptions: tuple[Type[Exception], ...] = retry_refresh_exceptions
969+
self.all: tuple[Type[Exception], ...] = tuple(set(retry_exceptions).union(retry_refresh_exceptions))
970+
971+
972+
class TableCommitRetry:
973+
"""Decorator for building the table commit retry controller."""
974+
975+
num_retries = "commit.retry.num-retries"
976+
num_retries_default: int = 4
977+
min_wait_ms = "commit.retry.min-wait-ms"
978+
min_wait_ms_default: int = 100
979+
max_wait_ms = "commit.retry.max-wait-ms"
980+
max_wait_ms_default: int = 60000 # 1 min
981+
total_timeout_ms = "commit.retry.total-timeout-ms"
982+
total_timeout_ms_default: int = 1800000 # 30 mins
983+
984+
properties_attr: str = "properties"
985+
refresh_attr: str = "refresh"
986+
commit_retry_exceptions_attr: str = "commit_retry_exceptions"
987+
988+
def __init__(self, func: Callable[..., Any]) -> None:
989+
self.func: Callable[..., Any] = func
990+
self.loaded_properties: Properties = {}
991+
self.loaded_exceptions: CommitTableRetryableExceptions = CommitTableRetryableExceptions((), ())
992+
993+
def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]:
994+
"""Return the __call__ method with the instance caller."""
995+
return partial(self.__call__, instance)
996+
997+
def __call__(self, instance: Table, *args: Any, **kwargs: Any) -> Any:
998+
"""Run function with the retrying controller on the caller instance."""
999+
self.loaded_properties = getattr(instance, self.properties_attr)
1000+
self.loaded_exceptions = getattr(instance, self.commit_retry_exceptions_attr)
1001+
previous_attempt_error = None
1002+
try:
1003+
for attempt in self.build_retry_controller():
1004+
with attempt:
1005+
# Refresh table is previous exception requires a refresh
1006+
if previous_attempt_error in self.loaded_exceptions.retry_refresh_exceptions:
1007+
self.refresh_table(instance)
1008+
1009+
result = self.func(instance, *args, **kwargs)
1010+
1011+
# Grab exception from the attempt
1012+
outcome = attempt.retry_state.outcome
1013+
previous_attempt_error = type(outcome.exception()) if outcome.failed else None
1014+
1015+
except RetryError as err:
1016+
raise Exception from err.reraise()
1017+
else:
1018+
return result
1019+
1020+
def build_retry_controller(self) -> Retrying:
1021+
"""Build the retry controller."""
1022+
return Retrying(
1023+
stop=(
1024+
stop_after_attempt(self.get_config(self.num_retries, self.num_retries_default))
1025+
| stop_after_delay(
1026+
datetime.timedelta(milliseconds=self.get_config(self.total_timeout_ms, self.total_timeout_ms_default))
1027+
)
1028+
),
1029+
wait=wait_exponential(
1030+
min=self.get_config(self.min_wait_ms, self.min_wait_ms_default) / 1000.0,
1031+
max=self.get_config(self.max_wait_ms, self.max_wait_ms_default) / 1000.0,
1032+
),
1033+
retry=retry_if_exception_type(self.loaded_exceptions.all),
1034+
)
1035+
1036+
def get_config(self, config: str, default: int) -> int:
1037+
"""Get config out of the properties."""
1038+
return self.to_int(value, default, config) if (value := self.loaded_properties.get(config)) else default
1039+
1040+
def refresh_table(self, instance: Table) -> None:
1041+
getattr(instance, self.refresh_attr)()
1042+
return
1043+
1044+
@staticmethod
1045+
def to_int(v: str, default: int, config: str) -> int:
1046+
"""Convert str value to int, otherwise return a default."""
1047+
try:
1048+
return int(v)
1049+
except (ValueError, TypeError):
1050+
warnings.warn(f"Expected an integer for table property {config}, got: {v}", category=UserWarning)
1051+
return default
1052+
1053+
9501054
class Table:
9511055
identifier: Identifier = Field()
9521056
metadata: TableMetadata
@@ -1188,6 +1292,12 @@ def refs(self) -> Dict[str, SnapshotRef]:
11881292
"""Return the snapshot references in the table."""
11891293
return self.metadata.refs
11901294

1295+
@property
1296+
def commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
1297+
"""Return the commit exceptions that can be retried on the catalog."""
1298+
return self.catalog._accepted_commit_retry_exceptions() # pylint: disable=W0212
1299+
1300+
@TableCommitRetry
11911301
def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
11921302
response = self.catalog._commit_table( # pylint: disable=W0212
11931303
CommitTableRequest(
@@ -1702,7 +1812,8 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
17021812
visit_with_partner(
17031813
Catalog._convert_schema_if_needed(new_schema),
17041814
-1,
1705-
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore
1815+
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
1816+
# type: ignore
17061817
PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive),
17071818
)
17081819
return self

tests/catalog/test_base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
PropertiesUpdateSummary,
3838
)
3939
from pyiceberg.exceptions import (
40+
CommitFailedException,
41+
CommitStateUnknownException,
4042
NamespaceAlreadyExistsError,
4143
NamespaceNotEmptyError,
4244
NoSuchNamespaceError,
@@ -50,6 +52,7 @@
5052
AddSchemaUpdate,
5153
CommitTableRequest,
5254
CommitTableResponse,
55+
CommitTableRetryableExceptions,
5356
Namespace,
5457
SetCurrentSchemaUpdate,
5558
Table,
@@ -130,6 +133,9 @@ def create_table(
130133
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
131134
raise NotImplementedError
132135

136+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
137+
return CommitTableRetryableExceptions((CommitStateUnknownException, NoSuchTableError), (CommitFailedException,))
138+
133139
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
134140
identifier_tuple = self.identifier_to_tuple_without_catalog(
135141
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])

0 commit comments

Comments
 (0)