Skip to content

Commit c166ff0

Browse files
committed
remove orphan files
1 parent a9ad3a3 commit c166ff0

8 files changed

Lines changed: 934 additions & 1 deletion

File tree

mkdocs/docs/api.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1516,6 +1516,28 @@ def cleanup_old_snapshots(table_name: str, snapshot_ids: list[int]):
15161516
cleanup_old_snapshots("analytics.user_events", [12345, 67890, 11111])
15171517
```
15181518

1519+
### Remove Orphan Files
1520+
1521+
Remove files in the table's storage location that are not reachable from any valid snapshot or metadata file. This typically happens after failed writes or aborted compactions leave residual data files behind. Table property `gc.enabled` must be set.
1522+
1523+
!!! warning
1524+
Removing orphan files is destructive and irreversible. Always start with `dry_run()` to inspect the candidates, and make sure no other table or in-flight writer is reading from the same storage location.
1525+
1526+
```python
1527+
from datetime import datetime, timedelta, timezone
1528+
1529+
# Dry run — list orphans without deleting
1530+
result = table.maintenance.remove_orphan_files() \
1531+
.older_than(timedelta(days=7)) \
1532+
.dry_run() \
1533+
.execute()
1534+
1535+
# Actually delete
1536+
table.maintenance.remove_orphan_files() \
1537+
.older_than(datetime.now(tz=timezone.utc) - timedelta(days=7)) \
1538+
.execute()
1539+
```
1540+
15191541
## Views
15201542

15211543
PyIceberg supports view operations.

pyiceberg/io/__init__.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import logging
3030
import warnings
3131
from abc import ABC, abstractmethod
32+
from collections.abc import Iterator
33+
from dataclasses import dataclass
34+
from datetime import datetime
3235
from io import SEEK_SET
3336
from types import TracebackType
3437
from typing import (
@@ -254,6 +257,15 @@ def create(self, overwrite: bool = False) -> OutputStream:
254257
"""
255258

256259

260+
@dataclass(frozen=True)
261+
class FileEntry:
262+
"""Metadata only for a single file."""
263+
264+
location: str
265+
size: int
266+
last_modified: datetime | None = None
267+
268+
257269
class FileIO(ABC):
258270
"""A base class for FileIO implementations."""
259271

@@ -291,6 +303,20 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
291303
FileNotFoundError: When the file at the provided location does not exist.
292304
"""
293305

306+
def list_prefix(self, location: str) -> Iterator[FileEntry]:
307+
"""Recursively list every file under the given location.
308+
309+
Returns metadata-only FileEntry objects.
310+
311+
312+
Args:
313+
location (str): A URI or path to recursively list.
314+
315+
Raises:
316+
NotImplementedError: If the FileIO implementation does not support listing.
317+
"""
318+
raise NotImplementedError(f"{type(self).__name__} does not support list_prefix")
319+
294320

295321
LOCATION = "location"
296322
WAREHOUSE = "warehouse"

pyiceberg/io/fsspec.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@
2222
import logging
2323
import os
2424
import threading
25-
from collections.abc import Callable
25+
from collections.abc import Callable, Iterator
2626
from copy import copy
27+
from datetime import datetime, timezone
2728
from functools import lru_cache
2829
from typing import (
2930
TYPE_CHECKING,
@@ -83,6 +84,7 @@
8384
S3_SIGNER_ENDPOINT,
8485
S3_SIGNER_ENDPOINT_DEFAULT,
8586
S3_SIGNER_URI,
87+
FileEntry,
8688
FileIO,
8789
InputFile,
8890
InputStream,
@@ -467,6 +469,30 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
467469
fs = self._get_fs_from_uri(uri)
468470
fs.rm(str_location)
469471

472+
def list_prefix(self, location: str) -> Iterator[FileEntry]:
473+
"""Recursively list every file under the given location."""
474+
uri = urlparse(location)
475+
fs = self._get_fs_from_uri(uri)
476+
477+
for path, info in fs.find(location, detail=True).items():
478+
if info.get("type") not in (None, "file"):
479+
continue
480+
481+
mtime = info.get("mtime") or info.get("LastModified") or info.get("last_modified")
482+
last_modified: datetime | None
483+
if isinstance(mtime, datetime):
484+
last_modified = mtime
485+
elif isinstance(mtime, (int, float)):
486+
last_modified = datetime.fromtimestamp(mtime, tz=timezone.utc)
487+
else:
488+
last_modified = None
489+
490+
yield FileEntry(
491+
location=path if uri.scheme in ("", "file") else f"{uri.scheme}://{path}",
492+
size=int(info.get("size") or 0),
493+
last_modified=last_modified,
494+
)
495+
470496
def _get_fs_from_uri(self, uri: "ParseResult") -> AbstractFileSystem:
471497
"""Get a filesystem from a parsed URI, using hostname for ADLS account resolution."""
472498
if uri.scheme in _ADLS_SCHEMES:

pyiceberg/io/pyarrow.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
from pyarrow._s3fs import S3RetryStrategy
6161
from pyarrow.fs import (
6262
FileInfo,
63+
FileSelector,
6364
FileSystem,
6465
FileType,
6566
)
@@ -114,6 +115,7 @@
114115
S3_ROLE_SESSION_NAME,
115116
S3_SECRET_ACCESS_KEY,
116117
S3_SESSION_TOKEN,
118+
FileEntry,
117119
FileIO,
118120
InputFile,
119121
InputStream,
@@ -674,6 +676,36 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
674676
raise PermissionError(f"Cannot delete file, access denied: {location}") from e
675677
raise # pragma: no cover - If some other kind of OSError, raise the raw error
676678

679+
def list_prefix(self, location: str) -> Iterator[FileEntry]:
680+
"""Recursively list every file under the given location."""
681+
original = urlparse(location)
682+
scheme, netloc, path = self.parse_location(location, self.properties)
683+
fs = self.fs_by_scheme(scheme, netloc)
684+
selector = FileSelector(path, recursive=True, allow_not_found=True)
685+
686+
if original.scheme in ("hdfs", "viewfs"):
687+
uri_prefix = f"{original.scheme}://{netloc}"
688+
ensure_leading_slash = True
689+
elif original.scheme:
690+
# Cloud filesystem paths from pyarrow already start with the bucket/container.
691+
uri_prefix = f"{original.scheme}://"
692+
ensure_leading_slash = False
693+
else:
694+
uri_prefix = ""
695+
ensure_leading_slash = False
696+
697+
for info in fs.get_file_info(selector):
698+
if info.type != FileType.File:
699+
continue
700+
info_path = info.path
701+
if ensure_leading_slash and not info_path.startswith("/"):
702+
info_path = "/" + info_path
703+
yield FileEntry(
704+
location=f"{uri_prefix}{info_path}",
705+
size=info.size or 0,
706+
last_modified=info.mtime,
707+
)
708+
677709
def __getstate__(self) -> dict[str, Any]:
678710
"""Create a dictionary of the PyArrowFileIO fields used when pickling."""
679711
fileio_copy = copy(self.__dict__)

pyiceberg/table/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ class TableProperties:
205205
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
206206
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1
207207

208+
GC_ENABLED = "gc.enabled"
209+
GC_ENABLED_DEFAULT = True
210+
208211

209212
class Transaction:
210213
_table: Table
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
if TYPE_CHECKING:
2626
from pyiceberg.table import Table
27+
from pyiceberg.table.maintenance.orphan_files import RemoveOrphanFiles
2728
from pyiceberg.table.update.snapshot import ExpireSnapshots
2829

2930

@@ -43,3 +44,13 @@ def expire_snapshots(self) -> ExpireSnapshots:
4344
from pyiceberg.table.update.snapshot import ExpireSnapshots
4445

4546
return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True))
47+
48+
def remove_orphan_files(self) -> RemoveOrphanFiles:
49+
"""Return a RemoveOrphanFiles builder for removing files unreachable from the table.
50+
51+
Returns:
52+
RemoveOrphanFiles builder for configuring and executing orphan file removal.
53+
"""
54+
from pyiceberg.table.maintenance.orphan_files import RemoveOrphanFiles
55+
56+
return RemoveOrphanFiles(self.tbl)

0 commit comments

Comments
 (0)