Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,10 @@ def namespace_to_string(identifier: str | Identifier, err: type[ValueError] | ty

return ".".join(segment.strip() for segment in tuple_identifier)

def supports_server_side_planning(self) -> bool:
"""Check if the catalog supports server-side scan planning."""
return False

@staticmethod
def identifier_to_database(
identifier: str | Identifier, err: type[ValueError] | type[NoSuchNamespaceError] = ValueError
Expand Down
126 changes: 120 additions & 6 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from collections import deque
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Union,
)

from pydantic import ConfigDict, Field, field_validator
from pydantic import ConfigDict, Field, TypeAdapter, field_validator
from requests import HTTPError, Session
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt

Expand All @@ -36,6 +37,16 @@
)
from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
from pyiceberg.catalog.rest.response import _handle_non_200_response
from pyiceberg.catalog.rest.scan_planning import (
FetchScanTasksRequest,
PlanCancelled,
PlanCompleted,
PlanFailed,
PlanningResponse,
PlanSubmitted,
PlanTableScanRequest,
ScanTasks,
)
from pyiceberg.exceptions import (
AuthorizationExpiredError,
CommitFailedException,
Expand All @@ -44,6 +55,7 @@
NamespaceNotEmptyError,
NoSuchIdentifierError,
NoSuchNamespaceError,
NoSuchPlanTaskError,
NoSuchTableError,
NoSuchViewError,
TableAlreadyExistsError,
Expand All @@ -56,6 +68,7 @@
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
FileScanTask,
StagedTable,
Table,
TableIdentifier,
Expand Down Expand Up @@ -315,6 +328,9 @@ class ListViewsResponse(IcebergBaseModel):
identifiers: list[ListViewResponseEntry] = Field()


_PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)


class RestCatalog(Catalog):
uri: str
_session: Session
Expand Down Expand Up @@ -374,15 +390,113 @@ def _create_session(self) -> Session:

return session

def is_rest_scan_planning_enabled(self) -> bool:
"""Check if rest server-side scan planning is enabled.
def supports_server_side_planning(self) -> bool:
"""Check if the catalog supports server-side scan planning."""
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and property_as_bool(
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
)

@retry(**_RETRY_ARGS)
def _plan_table_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> PlanningResponse:
"""Submit a scan plan request to the REST server.

Args:
identifier: Table identifier.
request: The scan plan request parameters.

Returns:
True if enabled, False otherwise.
PlanningResponse the result of the scan plan request representing the status

Raises:
NoSuchTableError: If a table with the given identifier does not exist.
"""
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and property_as_bool(
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
self._check_endpoint(Capability.V1_SUBMIT_TABLE_SCAN_PLAN)
response = self._session.post(
self.url(Endpoints.plan_table_scan, prefixed=True, **self._split_identifier_for_path(identifier)),
data=request.model_dump_json(by_alias=True, exclude_none=True).encode(UTF8),
)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchTableError})

return _PLANNING_RESPONSE_ADAPTER.validate_json(response.text)

@retry(**_RETRY_ARGS)
def _fetch_scan_tasks(self, identifier: str | Identifier, plan_task: str) -> ScanTasks:
"""Fetch additional scan tasks using a plan task token.

Args:
identifier: Table identifier.
plan_task: The plan task token from a previous response.

Returns:
ScanTasks containing file scan tasks and possibly more plan-task tokens.

Raises:
NoSuchPlanTaskError: If a plan task with the given identifier or task does not exist.
"""
self._check_endpoint(Capability.V1_TABLE_SCAN_PLAN_TASKS)
request = FetchScanTasksRequest(plan_task=plan_task)
response = self._session.post(
self.url(Endpoints.fetch_scan_tasks, prefixed=True, **self._split_identifier_for_path(identifier)),
data=request.model_dump_json(by_alias=True).encode(UTF8),
)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchPlanTaskError})

return ScanTasks.model_validate_json(response.text)

def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> list[FileScanTask]:
"""Plan a table scan and return FileScanTasks.

Handles the full scan planning lifecycle including pagination.

Args:
identifier: Table identifier.
request: The scan plan request parameters.

Returns:
List of FileScanTask objects ready for execution.

Raises:
RuntimeError: If planning fails, is cancelled, or returns unexpected response.
NotImplementedError: If async planning is required but not yet supported.
"""
response = self._plan_table_scan(identifier, request)

if isinstance(response, PlanFailed):
error_msg = response.error.message if response.error else "unknown error"
raise RuntimeError(f"Received status: failed: {error_msg}")

if isinstance(response, PlanCancelled):
raise RuntimeError("Received status: cancelled")

if isinstance(response, PlanSubmitted):
# TODO: implement polling for async planning
raise NotImplementedError(f"Async scan planning not yet supported for planId: {response.plan_id}")

if not isinstance(response, PlanCompleted):
raise RuntimeError(f"Invalid planStatus for response: {type(response).__name__}")

tasks: list[FileScanTask] = []

# Collect tasks from initial response
for task in response.file_scan_tasks:
tasks.append(FileScanTask.from_rest_response(task, response.delete_files))

# Fetch and collect from additional batches
pending_tasks = deque(response.plan_tasks)
while pending_tasks:
plan_task = pending_tasks.popleft()
batch = self._fetch_scan_tasks(identifier, plan_task)
for task in batch.file_scan_tasks:
tasks.append(FileScanTask.from_rest_response(task, batch.delete_files))
pending_tasks.extend(batch.plan_tasks)

return tasks

def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager:
"""Create the LegacyOAuth2AuthManager by fetching required properties.
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class NoSuchNamespaceError(Exception):
"""Raised when a referenced name-space is not found."""


class NoSuchPlanTaskError(Exception):
"""Raised when a scan plan task is not found."""


class RESTError(Exception):
"""Raises when there is an unknown response from the REST Catalog."""

Expand Down
22 changes: 22 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,28 @@ def __repr__(self) -> str:
"""Return the string representation of the DataFileContent class."""
return f"DataFileContent.{self.name}"

@staticmethod
def from_rest_type(content_type: str) -> DataFileContent:
"""Convert REST API content type string to DataFileContent.

Args:
content_type: REST API content type.

Returns:
The corresponding DataFileContent enum value.

Raises:
ValueError: If the content type is unknown.
"""
mapping = {
"data": DataFileContent.DATA,
"position-deletes": DataFileContent.POSITION_DELETES,
"equality-deletes": DataFileContent.EQUALITY_DELETES,
}
if content_type not in mapping:
raise ValueError(f"Invalid file content value: {content_type}")
return mapping[content_type]


class ManifestContent(int, Enum):
DATA = 0
Expand Down
Loading