Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions paimon-python/pypaimon/api/api_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
from pypaimon.table.instant import Instant


class RESTRequest(ABC):
Expand Down Expand Up @@ -84,3 +85,12 @@ class AlterTableRequest(RESTRequest):
FIELD_CHANGES = "changes"

changes: List[SchemaChange] = json_field(FIELD_CHANGES)


@dataclass
class RollbackTableRequest(RESTRequest):
FIELD_INSTANT = "instant"
FIELD_FROM_SNAPSHOT = "fromSnapshot"

instant: Instant = json_field(FIELD_INSTANT)
from_snapshot: Optional[int] = json_field(FIELD_FROM_SNAPSHOT)
4 changes: 4 additions & 0 deletions paimon-python/pypaimon/api/resource_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,7 @@ def rename_table(self) -> str:
def commit_table(self, database_name: str, table_name: str) -> str:
return ("{}/{}/{}/{}/{}/commit".format(self.base_path, self.DATABASES, RESTUtil.encode_string(database_name),
self.TABLES, RESTUtil.encode_string(table_name)))

def rollback_table(self, database_name: str, table_name: str) -> str:
return ("{}/{}/{}/{}/{}/rollback".format(self.base_path, self.DATABASES, RESTUtil.encode_string(database_name),
self.TABLES, RESTUtil.encode_string(table_name)))
24 changes: 23 additions & 1 deletion paimon-python/pypaimon/api/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

from pypaimon.api.api_request import (AlterDatabaseRequest, AlterTableRequest, CommitTableRequest,
CreateDatabaseRequest,
CreateTableRequest, RenameTableRequest)
CreateTableRequest, RenameTableRequest,
RollbackTableRequest)
from pypaimon.api.api_response import (CommitTableResponse, ConfigResponse,
GetDatabaseResponse, GetTableResponse,
GetTableTokenResponse,
Expand Down Expand Up @@ -358,6 +359,27 @@ def commit_snapshot(
)
return response.is_success()

def rollback_to(self, identifier, instant, from_snapshot=None):
"""Rollback table to the given instant.

Args:
identifier: The table identifier.
instant: The Instant (SnapshotInstant or TagInstant) to rollback to.
from_snapshot: Optional snapshot ID. Success only occurs when the
latest snapshot is this snapshot.

Raises:
NoSuchResourceException: If the table, snapshot or tag does not exist.
ForbiddenException: If no permission to access this table.
"""
database_name, table_name = self.__validate_identifier(identifier)
request = RollbackTableRequest(instant=instant, from_snapshot=from_snapshot)
self.client.post(
self.resource_paths.rollback_table(database_name, table_name),
request,
self.rest_auth_function
)

@staticmethod
def __validate_identifier(identifier: Identifier):
if not identifier:
Expand Down
17 changes: 17 additions & 0 deletions paimon-python/pypaimon/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ def commit_snapshot(

"""

def rollback_to(self, identifier, instant, from_snapshot=None):
"""Rollback table by the given identifier and instant.

Args:
identifier: Path of the table (Identifier instance).
instant: The Instant (SnapshotInstant or TagInstant) to rollback to.
from_snapshot: Optional snapshot ID. Success only occurs when the
latest snapshot is this snapshot.

Raises:
TableNotExistException: If the table does not exist.
UnsupportedOperationError: If the catalog does not support version management.
"""
raise NotImplementedError(
"rollback_to is not supported by this catalog."
)

def drop_partitions(
self,
identifier: Union[str, Identifier],
Expand Down
17 changes: 17 additions & 0 deletions paimon-python/pypaimon/catalog/catalog_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import Optional

from pypaimon.catalog.catalog_loader import CatalogLoader
from pypaimon.catalog.table_rollback import CatalogTableRollback
from pypaimon.common.identifier import Identifier
from pypaimon.snapshot.catalog_snapshot_commit import CatalogSnapshotCommit
from pypaimon.snapshot.renaming_snapshot_commit import RenamingSnapshotCommit
Expand Down Expand Up @@ -60,6 +61,22 @@ def snapshot_commit(self, snapshot_manager) -> Optional[SnapshotCommit]:
# to create locks based on the catalog lock context
return RenamingSnapshotCommit(snapshot_manager)

def catalog_table_rollback(self):
"""Create a TableRollback instance based on the catalog environment configuration.

If catalog_loader is available and version management is supported,
returns a CatalogTableRollback that delegates to catalog.rollback_to.
Otherwise, returns None.

Returns:
A TableRollback instance or None.
"""
if self.catalog_loader is not None and self.supports_version_management:
catalog = self.catalog_loader.load()
identifier = self.identifier
return CatalogTableRollback(catalog, identifier)
return None

def copy(self, identifier: Identifier) -> 'CatalogEnvironment':
"""
Create a copy of this CatalogEnvironment with a different identifier.
Expand Down
22 changes: 22 additions & 0 deletions paimon-python/pypaimon/catalog/rest/rest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,28 @@ def alter_table(
except ForbiddenException as e:
raise TableNoPermissionException(identifier) from e

def rollback_to(self, identifier, instant, from_snapshot=None):
"""Rollback table by the given identifier and instant.

Args:
identifier: Path of the table (Identifier or string).
instant: The Instant (SnapshotInstant or TagInstant) to rollback to.
from_snapshot: Optional snapshot ID. Success only occurs when the
latest snapshot is this snapshot.

Raises:
TableNotExistException: If the table does not exist.
TableNoPermissionException: If no permission to access this table.
"""
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
try:
self.rest_api.rollback_to(identifier, instant, from_snapshot)
except NoSuchResourceException as e:
raise TableNotExistException(identifier) from e
except ForbiddenException as e:
raise TableNoPermissionException(identifier) from e

def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
try:
response = self.rest_api.get_table(identifier)
Expand Down
62 changes: 62 additions & 0 deletions paimon-python/pypaimon/catalog/table_rollback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from abc import ABC, abstractmethod


class TableRollback(ABC):
"""Rollback table to instant from snapshot.
"""

@abstractmethod
def rollback_to(self, instant, from_snapshot=None):
"""Rollback table to the given instant.

Args:
instant: The Instant (SnapshotInstant or TagInstant) to rollback to.
from_snapshot: Optional snapshot ID. Success only occurs when the
latest snapshot is this snapshot.
"""


class CatalogTableRollback(TableRollback):
"""
Internal TableRollback implementation that delegates to catalog.rollback_to.
"""

def __init__(self, catalog, identifier):
self._catalog = catalog
self._identifier = identifier

def rollback_to(self, instant, from_snapshot=None):
"""Rollback table to the given instant via catalog.

Args:
instant: The Instant (SnapshotInstant or TagInstant) to rollback to.
from_snapshot: Optional snapshot ID. Success only occurs when the
latest snapshot is this snapshot.

Raises:
RuntimeError: If the table does not exist in the catalog.
"""
try:
self._catalog.rollback_to(self._identifier, instant, from_snapshot)
except Exception as e:
raise RuntimeError(
"Failed to rollback table {}: {}".format(
self._identifier, e)) from e
15 changes: 2 additions & 13 deletions paimon-python/pypaimon/manifest/manifest_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,6 @@ def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest
def _process_single_manifest(manifest_file: ManifestFileMeta) -> List[ManifestEntry]:
return self.read(manifest_file.file_name, manifest_entry_filter, drop_stats)

def _entry_identifier(e: ManifestEntry) -> tuple:
return (
tuple(e.partition.values),
e.bucket,
e.file.level,
e.file.file_name,
tuple(e.file.extra_files) if e.file.extra_files else (),
e.file.embedded_index,
e.file.external_path,
)

deleted_entry_keys = set()
added_entries = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
Expand All @@ -73,11 +62,11 @@ def _entry_identifier(e: ManifestEntry) -> tuple:
if entry.kind == 0: # ADD
added_entries.append(entry)
else: # DELETE
deleted_entry_keys.add(_entry_identifier(entry))
deleted_entry_keys.add(entry.identifier())

final_entries = [
entry for entry in added_entries
if _entry_identifier(entry) not in deleted_entry_keys
if entry.identifier() not in deleted_entry_keys
]
return final_entries

Expand Down
129 changes: 129 additions & 0 deletions paimon-python/pypaimon/manifest/schema/file_entry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Entry representing a file.

Follows the design of Java's org.apache.paimon.manifest.FileEntry.
"""


class FileEntry:
"""Entry representing a file.

The same Identifier indicates that the FileEntry refers to the same data file.
"""

class Identifier:
"""Unique identifier for a file entry.

Uses partition, bucket, level, fileName, extraFiles,
embeddedIndex and externalPath to identify a file.
"""

def __init__(self, partition, bucket, level, file_name,
extra_files, embedded_index, external_path):
self.partition = partition
self.bucket = bucket
self.level = level
self.file_name = file_name
self.extra_files = extra_files
self.embedded_index = embedded_index
self.external_path = external_path
self._hash = None

def __eq__(self, other):
if self is other:
return True
if other is None or not isinstance(other, FileEntry.Identifier):
return False
return (self.bucket == other.bucket
and self.level == other.level
and self.partition == other.partition
and self.file_name == other.file_name
and self.extra_files == other.extra_files
and self.embedded_index == other.embedded_index
and self.external_path == other.external_path)

def __hash__(self):
if self._hash is None:
self._hash = hash((
self.partition,
self.bucket,
self.level,
self.file_name,
self.extra_files,
self.embedded_index,
self.external_path,
))
return self._hash

def identifier(self):
"""Build a unique Identifier for this file entry.

Returns:
An Identifier instance.
"""
extra_files = (tuple(self.file.extra_files)
if self.file.extra_files else ())
return FileEntry.Identifier(
partition=self.partition,
bucket=self.bucket,
level=self.file.level,
file_name=self.file.file_name,
extra_files=extra_files,
embedded_index=self.file.embedded_index,
external_path=self.file.external_path,
)

@staticmethod
def merge_entries(entries):
"""Merge file entries: ADD and DELETE of the same file cancel each other.

- ADD: if identifier already in map, raise error; otherwise add to map.
- DELETE: if identifier already in map, remove both (cancel);
otherwise add to map.

Args:
entries: Iterable of FileEntry.

Returns:
List of merged FileEntry values, preserving insertion order.

Raises:
RuntimeError: If trying to add a file that is already in the map.
"""
entry_map = {}

for entry in entries:
entry_identifier = entry.identifier()
if entry.kind == 0: # ADD
if entry_identifier in entry_map:
raise RuntimeError(
"Trying to add file {} which is already added.".format(
entry.file.file_name))
entry_map[entry_identifier] = entry
elif entry.kind == 1: # DELETE
if entry_identifier in entry_map:
del entry_map[entry_identifier]
else:
entry_map[entry_identifier] = entry
else:
raise RuntimeError(
"Unknown entry kind: {}".format(entry.kind))

return list(entry_map.values())
3 changes: 2 additions & 1 deletion paimon-python/pypaimon/manifest/schema/manifest_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@

from pypaimon.manifest.schema.data_file_meta import (DATA_FILE_META_SCHEMA,
DataFileMeta)
from pypaimon.manifest.schema.file_entry import FileEntry
from pypaimon.table.row.generic_row import GenericRow


@dataclass
class ManifestEntry:
class ManifestEntry(FileEntry):
kind: int
partition: GenericRow
bucket: int
Expand Down
Loading