fix: remove CPython internals from PriorityThreadPoolExecutor#19
fix: remove CPython internals from PriorityThreadPoolExecutor#19
Conversation
Replace the _WorkItem/_work_queue subclass hack with a dispatcher-wrapper pattern: a PriorityQueue feeds work in priority order to a standard ThreadPoolExecutor via a daemon thread. This eliminates all CPython internal dependencies and fixes compatibility with Python 3.14+. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b538ba4d97
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| def submit(self, fn, /, *args, priority: int = 0, **kwargs) -> Future: | ||
| proxy = Future() | ||
| self._queue.put((-priority, self._counter(), (fn, args, kwargs, proxy))) | ||
| return proxy |
There was a problem hiding this comment.
Reject submit calls after shutdown
After shutdown() stops the dispatcher thread, submit() still enqueues work unconditionally and returns a fresh Future. Because no thread consumes _queue anymore, that future can remain pending forever and callers waiting on result() will hang. The previous ThreadPoolExecutor-based behavior raised on post-shutdown submissions, so this is a regression for any late producer thread.
Useful? React with 👍 / 👎.
data_diff/thread_utils.py
Outdated
| inner_future = self._inner.submit(fn, *args, **kwargs) | ||
| inner_future.add_done_callback(lambda f, p=proxy: _chain_future(f, p)) |
There was a problem hiding this comment.
Skip dispatch for already-cancelled proxy futures
The dispatcher submits every queued job to _inner without checking whether the returned proxy future was cancelled first. If a caller cancels before dispatch, the task still runs, and _chain_future then attempts to set a result/exception on a cancelled future, which can raise InvalidStateError in the callback path. This breaks expected cancellation semantics for expensive or side-effecting tasks.
Useful? React with 👍 / 👎.
- Guard _chain_future against cancelled dest and internal exceptions
- Add try/except in dispatcher to propagate errors to proxy futures
- Reject submit() after shutdown() with RuntimeError
- Use float('inf') sentinel priority to never preempt queued work
- Add 30s timeout to dispatcher join to prevent deadlock on crash
- Add tests for all new error paths
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Initialize proxy = None before try block and check `is not None` instead of using `"proxy" in dir()` which doesn't reliably reflect local variables and retains stale references across loop iterations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
_WorkItem/_work_queuesubclass hack inPriorityThreadPoolExecutorwith a clean dispatcher-wrapper pattern — aPriorityQueuefeeds work in priority order to a standardThreadPoolExecutorvia a daemon thread_WorkItem,_work_queue), fixing compatibility with Python 3.14+ where_WorkItemwas refactoredThreadedYielderintegrationCloses #4
Test plan
uv run python -m pytest tests/test_thread_utils.py -v— 11/11 new tests passuv run python -m pytest tests/test_query.py tests/test_diff_tables.py::TestUtils -v— existing tests unaffecteduv run python -c "from data_diff.thread_utils import PriorityThreadPoolExecutor"— no_WorkItemimport🤖 Generated with Claude Code