forked from datafold/data-diff
-
Notifications
You must be signed in to change notification settings - Fork 1
fix: remove CPython internals from PriorityThreadPoolExecutor #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,207 @@ | ||
| import threading | ||
| from concurrent.futures import Future | ||
|
|
||
| import pytest | ||
|
|
||
| from data_diff.thread_utils import ( | ||
| PriorityThreadPoolExecutor, | ||
| ThreadedYielder, | ||
| _chain_future, | ||
| ) | ||
|
|
||
|
|
||
| class TestPriorityThreadPoolExecutor: | ||
| def test_priority_ordering(self): | ||
| """Higher-priority tasks execute before lower-priority ones.""" | ||
| gate = threading.Event() | ||
| results = [] | ||
|
|
||
| pool = PriorityThreadPoolExecutor(max_workers=1) | ||
|
|
||
| # Block the single worker so tasks queue up | ||
| pool.submit(lambda: gate.wait(), priority=0) | ||
|
|
||
| # Submit tasks with different priorities while worker is blocked | ||
| for p in [1, 3, 2]: | ||
| pool.submit(lambda p=p: results.append(p), priority=p) | ||
|
|
||
| # Release the gate — queued tasks run in priority order | ||
| gate.set() | ||
| pool.shutdown(wait=True) | ||
|
|
||
| assert results == [3, 2, 1] | ||
|
|
||
| def test_fifo_within_same_priority(self): | ||
| """Equal-priority tasks run in submission order (FIFO).""" | ||
| gate = threading.Event() | ||
| results = [] | ||
|
|
||
| pool = PriorityThreadPoolExecutor(max_workers=1) | ||
| pool.submit(lambda: gate.wait(), priority=0) | ||
|
|
||
| for i in range(5): | ||
| pool.submit(lambda i=i: results.append(i), priority=1) | ||
|
|
||
| gate.set() | ||
| pool.shutdown(wait=True) | ||
|
|
||
| assert results == [0, 1, 2, 3, 4] | ||
|
|
||
| def test_submit_returns_future_with_result(self): | ||
| """submit() returns a Future that resolves to the function's return value.""" | ||
| pool = PriorityThreadPoolExecutor(max_workers=2) | ||
| future = pool.submit(lambda: 42) | ||
| assert future.result(timeout=5) == 42 | ||
| pool.shutdown() | ||
|
|
||
| def test_submit_returns_future_with_exception(self): | ||
| """Exceptions in submitted functions propagate through the Future.""" | ||
| pool = PriorityThreadPoolExecutor(max_workers=2) | ||
| future = pool.submit(lambda: 1 / 0) | ||
| with pytest.raises(ZeroDivisionError): | ||
| future.result(timeout=5) | ||
| pool.shutdown() | ||
|
|
||
| def test_concurrent_submit(self): | ||
| """Submitting from multiple threads is safe.""" | ||
| pool = PriorityThreadPoolExecutor(max_workers=4) | ||
| results = [] | ||
| lock = threading.Lock() | ||
|
|
||
| def task(n): | ||
| with lock: | ||
| results.append(n) | ||
|
|
||
| threads = [] | ||
| for i in range(20): | ||
| t = threading.Thread(target=lambda i=i: pool.submit(task, i, priority=0)) | ||
| threads.append(t) | ||
| t.start() | ||
|
|
||
| for t in threads: | ||
| t.join() | ||
|
|
||
| pool.shutdown(wait=True) | ||
| assert sorted(results) == list(range(20)) | ||
|
|
||
| def test_shutdown_with_pending_work(self): | ||
| """Shutdown completes all pending work before returning.""" | ||
| results = [] | ||
| pool = PriorityThreadPoolExecutor(max_workers=1) | ||
|
|
||
| for i in range(10): | ||
| pool.submit(lambda i=i: results.append(i), priority=0) | ||
|
|
||
| pool.shutdown(wait=True) | ||
| assert sorted(results) == list(range(10)) | ||
|
|
||
| def test_no_cpython_internals_imported(self): | ||
| """Verify _WorkItem is not imported.""" | ||
| import data_diff.thread_utils as mod | ||
|
|
||
| assert not hasattr(mod, "_WorkItem") | ||
|
|
||
| def test_submit_forwards_args_and_kwargs(self): | ||
| """submit() correctly forwards positional and keyword arguments.""" | ||
| pool = PriorityThreadPoolExecutor(max_workers=1) | ||
| future = pool.submit(lambda a, b, c=None: (a, b, c), 1, 2, c=3) | ||
| assert future.result(timeout=5) == (1, 2, 3) | ||
| pool.shutdown() | ||
|
|
||
| def test_submit_after_shutdown_raises(self): | ||
| """submit() raises RuntimeError after shutdown() is called.""" | ||
| pool = PriorityThreadPoolExecutor(max_workers=1) | ||
| pool.shutdown() | ||
| with pytest.raises(RuntimeError, match="cannot submit after shutdown"): | ||
| pool.submit(lambda: None) | ||
|
|
||
| def test_shutdown_drains_high_priority_work(self): | ||
| """Sentinel does not preempt queued higher-priority work.""" | ||
| gate = threading.Event() | ||
| results = [] | ||
|
|
||
| pool = PriorityThreadPoolExecutor(max_workers=1) | ||
| pool.submit(lambda: gate.wait(), priority=0) | ||
|
|
||
| for i in range(5): | ||
| pool.submit(lambda i=i: results.append(i), priority=10) | ||
|
|
||
| gate.set() | ||
| pool.shutdown(wait=True) | ||
| assert sorted(results) == list(range(5)) | ||
|
|
||
|
|
||
| class TestChainFuture: | ||
| def test_propagates_result(self): | ||
| """Chains result from source to dest.""" | ||
| source = Future() | ||
| dest = Future() | ||
| source.set_result(42) | ||
| _chain_future(source, dest) | ||
| assert dest.result() == 42 | ||
|
|
||
| def test_propagates_exception(self): | ||
| """Chains exception from source to dest.""" | ||
| source = Future() | ||
| dest = Future() | ||
| source.set_exception(ValueError("oops")) | ||
| _chain_future(source, dest) | ||
| with pytest.raises(ValueError, match="oops"): | ||
| dest.result() | ||
|
|
||
| def test_skips_cancelled_dest(self): | ||
| """Does not raise if dest was already cancelled.""" | ||
| source = Future() | ||
| dest = Future() | ||
| dest.cancel() | ||
| source.set_result(42) | ||
| _chain_future(source, dest) # should not raise | ||
|
|
||
|
|
||
| class TestThreadedYielder: | ||
| def test_basic_yield(self): | ||
| """ThreadedYielder collects results from submitted functions.""" | ||
| ty = ThreadedYielder(max_workers=2) | ||
| ty.submit(lambda: [1, 2, 3]) | ||
| ty.submit(lambda: [4, 5, 6]) | ||
|
|
||
| result = list(ty) | ||
| assert sorted(result) == [1, 2, 3, 4, 5, 6] | ||
|
|
||
| def test_priority_behavior(self): | ||
| """Higher-priority iterators get scheduled first.""" | ||
| gate = threading.Event() | ||
| ty = ThreadedYielder(max_workers=1) | ||
|
|
||
| # Block the worker | ||
| def wait_gate(): | ||
| gate.wait() | ||
| return [] | ||
|
|
||
| ty.submit(wait_gate, priority=0) | ||
|
|
||
| # Queue tasks with different priorities | ||
| ty.submit(lambda: ["low"], priority=1) | ||
| ty.submit(lambda: ["high"], priority=3) | ||
| ty.submit(lambda: ["mid"], priority=2) | ||
|
|
||
| gate.set() | ||
| result = list(ty) | ||
| # High-priority tasks should execute first | ||
| assert result == ["high", "mid", "low"] | ||
|
|
||
| def test_yield_list_mode(self): | ||
| """yield_list=True appends entire results rather than extending.""" | ||
| ty = ThreadedYielder(max_workers=1, yield_list=True) | ||
| ty.submit(lambda: [1, 2, 3]) | ||
|
|
||
| result = list(ty) | ||
| assert result == [[1, 2, 3]] | ||
|
|
||
| def test_exception_propagation(self): | ||
| """Exceptions in submitted functions propagate through iteration.""" | ||
| ty = ThreadedYielder(max_workers=1) | ||
| ty.submit(lambda: (_ for _ in ()).throw(ValueError("boom"))) | ||
|
|
||
| with pytest.raises(ValueError, match="boom"): | ||
| list(ty) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After
shutdown()stops the dispatcher thread,submit()still enqueues work unconditionally and returns a freshFuture. Because no thread consumes_queueanymore, that future can remain pending forever and callers waiting onresult()will hang. The previousThreadPoolExecutor-based behavior raised on post-shutdown submissions, so this is a regression for any late producer thread.Useful? React with 👍 / 👎.