-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathtable_rollback.py
More file actions
62 lines (49 loc) · 2.16 KB
/
table_rollback.py
File metadata and controls
62 lines (49 loc) · 2.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from abc import ABC, abstractmethod
class TableRollback(ABC):
"""Rollback table to instant from snapshot.
"""
@abstractmethod
def rollback_to(self, instant, from_snapshot=None):
"""Rollback table to the given instant.
Args:
instant: The Instant (SnapshotInstant or TagInstant) to rollback to.
from_snapshot: Optional snapshot ID. Success only occurs when the
latest snapshot is this snapshot.
"""
class CatalogTableRollback(TableRollback):
"""
Internal TableRollback implementation that delegates to catalog.rollback_to.
"""
def __init__(self, catalog, identifier):
self._catalog = catalog
self._identifier = identifier
def rollback_to(self, instant, from_snapshot=None):
"""Rollback table to the given instant via catalog.
Args:
instant: The Instant (SnapshotInstant or TagInstant) to rollback to.
from_snapshot: Optional snapshot ID. Success only occurs when the
latest snapshot is this snapshot.
Raises:
RuntimeError: If the table does not exist in the catalog.
"""
try:
self._catalog.rollback_to(self._identifier, instant, from_snapshot)
except Exception as e:
raise RuntimeError(
"Failed to rollback table {}: {}".format(
self._identifier, e)) from e