-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbounded_blocking_queue.py
More file actions
77 lines (67 loc) · 2.78 KB
/
bounded_blocking_queue.py
File metadata and controls
77 lines (67 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""Implement a bounded blocking queue.
Notes:
* Only uses a Lock and Conditions, not Threads
* Methods: put, get, task_done, join
* Each method acquires a lock at the beginning and releases at the end
* I could just acquire/release the lock directly rather than through the condition variables too
* join method is simpler than task_done method, but both have try...finally
Sources:
* https://github.com/python/cpython/blob/8d21aa21f2cbc6d50aab3f420bb23be1d081dac4/Lib/Queue.py#L200
* https://github.com/python/cpython/blob/8d21aa21f2cbc6d50aab3f420bb23be1d081dac4/Lib/test/test_queue.py
* https://leetcode.com/problems/design-bounded-blocking-queue/
"""
import threading
import collections
class BoundedBlockingQueue:
def __init__(self, capacity=0):
self.capacity = capacity
self.unfinished_tasks = 0
self.queue = collections.deque()
self.lock = threading.Lock()
# All these Condition objects share the same lock
# When acquired is called for any one of these,
# then it locks all of them
self.not_full = threading.Condition(self.lock)
self.not_empty = threading.Condition(self.lock)
self.all_tasks_done = threading.Condition(self.lock)
def enqueue(self, element: int):
# block = True, timeout = None
self.not_full.acquire()
if self.capacity > 0:
while len(self.queue) == self.capacity:
self.not_full.wait() # awoken by the notify in get
self.queue.append(element)
self.unfinished_tasks += 1
self.not_empty.notify() # this wakes up 1 thread waiting in get
self.not_full.release()
def dequeue(self):
self.not_empty.acquire()
while len(self.queue) == 0:
self.not_empty.wait()
element = self.queue.popleft()
self.not_full.notify()
self.not_empty.release()
return element
def size(self):
self.lock.acquire()
n = len(self.queue)
self.lock.release()
return n
def task_done(self):
self.all_tasks_done.acquire()
try:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all() # notify all threads that have called join
self.unfinished_tasks = unfinished
finally:
self.all_tasks_done.release()
def join(self):
self.all_tasks_done.acquire()
try: # catches possible ValueError in task_done
while self.unfinished_tasks:
self.all_tasks_done.wait() # wait on the notify in task_done when it reaches 0 tasks
finally:
self.all_tasks_done.release()