-
-
Notifications
You must be signed in to change notification settings - Fork 3.1k
fix: rm_rf now handles directories without S_IXUSR permission #14331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2afd74f
ce05986
58485d7
d9fd704
dfc6834
9511d93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fixed cleanup of temporary directories failing when subdirectories have their execute permission removed. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| from pathlib import PurePath | ||
| from posixpath import sep as posix_sep | ||
| import shutil | ||
| import stat | ||
| import sys | ||
| import types | ||
| from types import ModuleType | ||
|
|
@@ -69,6 +70,29 @@ def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: | |
| return path.joinpath(".lock") | ||
|
|
||
|
|
||
| def _chmod_rwx(p: str) -> bool: | ||
| """Grant owner sufficient permissions for deletion. | ||
|
|
||
| Directories get ``S_IRWXU`` (read+write+exec for traversal). | ||
| Regular files get ``S_IRUSR | S_IWUSR`` only, to avoid making | ||
| non-executable files executable as a side effect. | ||
|
|
||
| Returns True if permissions were actually changed, False if they were | ||
| already sufficient or couldn't be changed. | ||
| """ | ||
| try: | ||
| old_mode = os.stat(p).st_mode | ||
| perm_mode = stat.S_IMODE(old_mode) | ||
| bits = stat.S_IRWXU if stat.S_ISDIR(old_mode) else stat.S_IRUSR | stat.S_IWUSR | ||
| new_mode = perm_mode | bits | ||
| if perm_mode == new_mode: | ||
| return False | ||
| os.chmod(p, new_mode) | ||
| except OSError: | ||
| return False | ||
| return True | ||
|
|
||
|
|
||
| def on_rm_rf_error( | ||
| func: Callable[..., Any] | None, | ||
| path: str, | ||
|
|
@@ -97,32 +121,47 @@ def on_rm_rf_error( | |
| ) | ||
| return False | ||
|
|
||
| p = Path(path) | ||
|
|
||
| if func in (os.open, os.scandir): | ||
| # Directory traversal failed (e.g. missing S_IXUSR). Fix permissions | ||
|
Comment on lines
+126
to
+127
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the implicit assumption here that |
||
| # on the path and its parent (bounded by start_path), then remove it | ||
| # ourselves since rmtree skips entries after the error handler returns. | ||
| # See: https://github.com/pytest-dev/pytest/issues/7940 | ||
| parent_changed = False | ||
| parent = p.parent | ||
| if parent not in (p, start_path): | ||
| parent_changed = _chmod_rwx(str(parent)) | ||
| path_changed = _chmod_rwx(path) | ||
| if not (parent_changed or path_changed): | ||
| return False | ||
| if p.is_dir(): | ||
| rm_rf(p) | ||
| else: | ||
| try: | ||
| os.unlink(path) | ||
| except OSError: | ||
| return False | ||
| return True | ||
|
|
||
| if func not in (os.rmdir, os.remove, os.unlink): | ||
| if func not in (os.open,): | ||
| warnings.warn( | ||
| PytestWarning( | ||
| f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}" | ||
| ) | ||
| warnings.warn( | ||
| PytestWarning( | ||
| f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}" | ||
| ) | ||
| ) | ||
| return False | ||
|
|
||
| # Chmod + retry. | ||
| import stat | ||
|
|
||
| def chmod_rw(p: str) -> None: | ||
| mode = os.stat(p).st_mode | ||
| os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) | ||
|
|
||
| # For files, we need to recursively go upwards in the directories to | ||
| # ensure they all are also writable. | ||
| p = Path(path) | ||
| # ensure they all are also accessible and writable. | ||
| if p.is_file(): | ||
| for parent in p.parents: | ||
| chmod_rw(str(parent)) | ||
| for parent in p.parents: # pragma: no branch | ||
| _chmod_rwx(str(parent)) | ||
| # Stop when we reach the original path passed to rm_rf. | ||
| if parent == start_path: | ||
| break | ||
| chmod_rw(str(path)) | ||
| _chmod_rwx(str(path)) | ||
|
|
||
| func(path) | ||
| return True | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |||||
| from _pytest import pathlib | ||||||
| from _pytest.config import Config | ||||||
| from _pytest.monkeypatch import MonkeyPatch | ||||||
| from _pytest.pathlib import _chmod_rwx | ||||||
| from _pytest.pathlib import cleanup_numbered_dir | ||||||
| from _pytest.pathlib import create_cleanup_lock | ||||||
| from _pytest.pathlib import make_numbered_dir | ||||||
|
|
@@ -522,6 +523,10 @@ def test_removal_accepts_lock(self, tmp_path): | |||||
| assert dir.is_dir() | ||||||
|
|
||||||
|
|
||||||
| def _raise_oserror(*args: object, **kwargs: object) -> None: | ||||||
| raise OSError("simulated failure") | ||||||
|
|
||||||
|
|
||||||
| class TestRmRf: | ||||||
| def test_rm_rf(self, tmp_path): | ||||||
| adir = tmp_path / "adir" | ||||||
|
|
@@ -566,6 +571,25 @@ def test_rm_rf_with_read_only_directory(self, tmp_path): | |||||
|
|
||||||
| assert not adir.is_dir() | ||||||
|
|
||||||
| @pytest.mark.skipif(not hasattr(os, "getuid"), reason="unix permissions") | ||||||
| def test_rm_rf_with_no_exec_permission_directories(self, tmp_path): | ||||||
| """Ensure rm_rf can remove directories without S_IXUSR (#7940). | ||||||
|
|
||||||
| This is the exact scenario from the original issue: nested directories | ||||||
| and files with all permissions stripped. | ||||||
| """ | ||||||
| p = tmp_path / "foo" / "bar" / "baz" | ||||||
| p.parent.mkdir(parents=True) | ||||||
| p.touch(mode=0) | ||||||
| for parent in p.parents: # pragma: no branch | ||||||
| if parent == tmp_path: | ||||||
| break | ||||||
| parent.chmod(mode=0) | ||||||
|
|
||||||
| rm_rf(tmp_path / "foo") | ||||||
|
|
||||||
| assert not (tmp_path / "foo").exists() | ||||||
|
|
||||||
| def test_on_rm_rf_error(self, tmp_path: Path) -> None: | ||||||
| adir = tmp_path / "dir" | ||||||
| adir.mkdir() | ||||||
|
|
@@ -593,17 +617,125 @@ def test_on_rm_rf_error(self, tmp_path: Path) -> None: | |||||
| on_rm_rf_error(None, str(fn), exc_info3, start_path=tmp_path) | ||||||
| assert fn.is_file() | ||||||
|
|
||||||
| # ignored function | ||||||
| with warnings.catch_warnings(record=True) as w: | ||||||
| exc_info4 = PermissionError() | ||||||
| on_rm_rf_error(os.open, str(fn), exc_info4, start_path=tmp_path) | ||||||
| assert fn.is_file() | ||||||
| assert not [x.message for x in w] | ||||||
|
|
||||||
| # os.unlink PermissionError is handled (chmod + retry) | ||||||
| exc_info5 = PermissionError() | ||||||
| on_rm_rf_error(os.unlink, str(fn), exc_info5, start_path=tmp_path) | ||||||
| assert not fn.is_file() | ||||||
|
|
||||||
| def test_on_rm_rf_error_os_open_handles_file(self, tmp_path: Path) -> None: | ||||||
| """os.open PermissionError on a file is handled by fixing | ||||||
| permissions and removing it (#7940).""" | ||||||
| adir = tmp_path / "dir" | ||||||
| adir.mkdir() | ||||||
| fn = adir / "foo.txt" | ||||||
| fn.touch() | ||||||
| self.chmod_r(fn) | ||||||
|
|
||||||
| with warnings.catch_warnings(record=True) as w: | ||||||
| exc_info = PermissionError() | ||||||
| on_rm_rf_error(os.open, str(fn), exc_info, start_path=tmp_path) | ||||||
| assert not fn.exists() | ||||||
| assert not [x.message for x in w] | ||||||
|
|
||||||
| @pytest.mark.skipif(not hasattr(os, "getuid"), reason="unix permissions") | ||||||
| def test_on_rm_rf_error_os_open_handles_directory(self, tmp_path: Path) -> None: | ||||||
| """os.open PermissionError on a directory is handled by fixing | ||||||
| permissions and recursively removing it (#7940).""" | ||||||
| adir = tmp_path / "dir" | ||||||
| adir.mkdir() | ||||||
| (adir / "child").mkdir() | ||||||
| (adir / "child" / "file.txt").touch() | ||||||
| os.chmod(str(adir), 0o600) | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| with warnings.catch_warnings(record=True) as w: | ||||||
| exc_info = PermissionError() | ||||||
| on_rm_rf_error(os.open, str(adir), exc_info, start_path=tmp_path) | ||||||
| assert not adir.exists() | ||||||
| assert not [x.message for x in w] | ||||||
|
|
||||||
| @pytest.mark.skipif(not hasattr(os, "getuid"), reason="unix permissions") | ||||||
| def test_on_rm_rf_error_os_open_parent_perms(self, tmp_path: Path) -> None: | ||||||
| """When the PermissionError is caused by the *parent* directory lacking | ||||||
| S_IXUSR, fixing the parent is sufficient even if the child already has | ||||||
| correct permissions.""" | ||||||
| parent = tmp_path / "parent" | ||||||
| parent.mkdir() | ||||||
| child = parent / "child" | ||||||
| child.mkdir() | ||||||
| (child / "file.txt").touch() | ||||||
| # Child has full perms, but parent lacks execute -> os.open(child) fails. | ||||||
| os.chmod(str(parent), 0o600) | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| with warnings.catch_warnings(record=True) as w: | ||||||
| exc_info = PermissionError() | ||||||
| result = on_rm_rf_error(os.open, str(child), exc_info, start_path=tmp_path) | ||||||
| assert result is True | ||||||
| assert not child.exists() | ||||||
| assert not [x.message for x in w] | ||||||
|
|
||||||
| def test_chmod_rwx_returns_false_on_nonexistent_path(self, tmp_path: Path) -> None: | ||||||
| """_chmod_rwx returns False when the path doesn't exist (OSError).""" | ||||||
| nonexistent = tmp_path / "does_not_exist" | ||||||
| assert _chmod_rwx(str(nonexistent)) is False | ||||||
|
|
||||||
| def test_chmod_rwx_returns_false_when_already_sufficient( | ||||||
| self, tmp_path: Path | ||||||
| ) -> None: | ||||||
| """_chmod_rwx returns False when permissions are already sufficient.""" | ||||||
| d = tmp_path / "dir" | ||||||
| d.mkdir(mode=0o700) | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| assert _chmod_rwx(str(d)) is False | ||||||
|
|
||||||
| f = tmp_path / "file" | ||||||
| f.touch(mode=0o600) | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| assert _chmod_rwx(str(f)) is False | ||||||
|
|
||||||
| def test_on_rm_rf_error_os_open_returns_false_when_chmod_ineffective( | ||||||
| self, tmp_path: Path | ||||||
| ) -> None: | ||||||
| """os.open handler returns False when neither parent nor path chmod | ||||||
| changes anything (recursion guard).""" | ||||||
| adir = tmp_path / "dir" | ||||||
| adir.mkdir(mode=0o700) | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| exc_info = PermissionError() | ||||||
| result = on_rm_rf_error(os.open, str(adir), exc_info, start_path=tmp_path) | ||||||
| assert result is False | ||||||
| assert adir.exists() | ||||||
|
|
||||||
| def test_on_rm_rf_error_os_open_unlink_fails( | ||||||
| self, tmp_path: Path, monkeypatch: MonkeyPatch | ||||||
| ) -> None: | ||||||
| """os.open handler returns False when chmod succeeds but os.unlink | ||||||
| still raises OSError (e.g. sandbox or other mechanism).""" | ||||||
| fn = tmp_path / "stubborn.txt" | ||||||
| fn.touch(mode=0) | ||||||
|
|
||||||
| monkeypatch.setattr(os, "unlink", _raise_oserror) | ||||||
|
|
||||||
| exc_info = PermissionError() | ||||||
| result = on_rm_rf_error(os.open, str(fn), exc_info, start_path=tmp_path) | ||||||
| assert result is False | ||||||
| assert fn.exists() | ||||||
|
|
||||||
| @pytest.mark.skipif(not hasattr(os, "getuid"), reason="unix permissions") | ||||||
| def test_on_rm_rf_error_chmod_retry_walks_parents(self, tmp_path: Path) -> None: | ||||||
| """The os.rmdir/os.unlink handler walks up through multiple parent | ||||||
| directories to fix permissions before retrying.""" | ||||||
| deep = tmp_path / "a" / "b" / "c" | ||||||
| deep.mkdir(parents=True) | ||||||
| fn = deep / "file.txt" | ||||||
| fn.touch() | ||||||
| # Remove write from intermediate dirs (keep exec so traversal works, | ||||||
| # but os.unlink needs write on the parent). | ||||||
| for parent in fn.parents: # pragma: no branch | ||||||
| if parent == tmp_path: | ||||||
| break | ||||||
| parent.chmod(0o555) | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||||||
|
|
||||||
| exc_info = PermissionError() | ||||||
| on_rm_rf_error(os.unlink, str(fn), exc_info, start_path=tmp_path) | ||||||
| assert not fn.exists() | ||||||
|
|
||||||
|
|
||||||
| def attempt_symlink_to(path, to_path): | ||||||
| """Try to make a symlink from "path" to "to_path", skipping in case this platform | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this
OSErrorintended to cover even theos.statcalls? If not we should use a narrower try/except block.