Skip to content

fix: remove CPython internals from PriorityThreadPoolExecutor#19

Merged
dtsong merged 3 commits intomasterfrom
fix/priority-thread-pool-cpython-dependency
Mar 2, 2026
Merged

fix: remove CPython internals from PriorityThreadPoolExecutor#19
dtsong merged 3 commits intomasterfrom
fix/priority-thread-pool-cpython-dependency

Conversation

@dtsong
Copy link
Owner

@dtsong dtsong commented Mar 2, 2026

Summary

  • Replaces the _WorkItem/_work_queue subclass hack in PriorityThreadPoolExecutor with a clean dispatcher-wrapper pattern — a PriorityQueue feeds work in priority order to a standard ThreadPoolExecutor via a daemon thread
  • Eliminates all CPython internal dependencies (_WorkItem, _work_queue), fixing compatibility with Python 3.14+ where _WorkItem was refactored
  • Adds 11 unit tests covering priority ordering, FIFO, concurrency, shutdown, and ThreadedYielder integration

Closes #4

Test plan

  • uv run python -m pytest tests/test_thread_utils.py -v — 11/11 new tests pass
  • uv run python -m pytest tests/test_query.py tests/test_diff_tables.py::TestUtils -v — existing tests unaffected
  • uv run python -c "from data_diff.thread_utils import PriorityThreadPoolExecutor" — no _WorkItem import

🤖 Generated with Claude Code

Replace the _WorkItem/_work_queue subclass hack with a dispatcher-wrapper
pattern: a PriorityQueue feeds work in priority order to a standard
ThreadPoolExecutor via a daemon thread. This eliminates all CPython
internal dependencies and fixes compatibility with Python 3.14+.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b538ba4d97

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +48 to +51
def submit(self, fn, /, *args, priority: int = 0, **kwargs) -> Future:
proxy = Future()
self._queue.put((-priority, self._counter(), (fn, args, kwargs, proxy)))
return proxy

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reject submit calls after shutdown

After shutdown() stops the dispatcher thread, submit() still enqueues work unconditionally and returns a fresh Future. Because no thread consumes _queue anymore, that future can remain pending forever and callers waiting on result() will hang. The previous ThreadPoolExecutor-based behavior raised on post-shutdown submissions, so this is a regression for any late producer thread.

Useful? React with 👍 / 👎.

Comment on lines +45 to +46
inner_future = self._inner.submit(fn, *args, **kwargs)
inner_future.add_done_callback(lambda f, p=proxy: _chain_future(f, p))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Skip dispatch for already-cancelled proxy futures

The dispatcher submits every queued job to _inner without checking whether the returned proxy future was cancelled first. If a caller cancels before dispatch, the task still runs, and _chain_future then attempts to set a result/exception on a cancelled future, which can raise InvalidStateError in the callback path. This breaks expected cancellation semantics for expensive or side-effecting tasks.

Useful? React with 👍 / 👎.

dtsong and others added 2 commits March 1, 2026 23:21
- Guard _chain_future against cancelled dest and internal exceptions
- Add try/except in dispatcher to propagate errors to proxy futures
- Reject submit() after shutdown() with RuntimeError
- Use float('inf') sentinel priority to never preempt queued work
- Add 30s timeout to dispatcher join to prevent deadlock on crash
- Add tests for all new error paths

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Initialize proxy = None before try block and check `is not None`
instead of using `"proxy" in dir()` which doesn't reliably reflect
local variables and retains stale references across loop iterations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@dtsong dtsong merged commit 13963a4 into master Mar 2, 2026
6 checks passed
@dtsong dtsong deleted the fix/priority-thread-pool-cpython-dependency branch March 2, 2026 07:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PriorityThreadPoolExecutor relies on CPython implementation details

1 participant