From e1ae62505875ccde4f123726cadf4a7225a028cc Mon Sep 17 00:00:00 2001
From: Prithviraj Chaudhuri
Date: Tue, 6 Jan 2026 00:37:52 -0500
Subject: [PATCH 1/4] Used PyEvents in subinterpreter queues to wait for queue
full and queue empty conditions.
---
Lib/concurrent/interpreters/_queues.py | 4 ++--
Modules/_interpqueuesmodule.c | 14 ++++++++++++++
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/Lib/concurrent/interpreters/_queues.py b/Lib/concurrent/interpreters/_queues.py
index ee159d7de63827..69eb234a9c4fb7 100644
--- a/Lib/concurrent/interpreters/_queues.py
+++ b/Lib/concurrent/interpreters/_queues.py
@@ -226,7 +226,7 @@ def put(self, obj, block=True, timeout=None, *,
except QueueFull:
if timeout is not None and time.time() >= end:
raise # re-raise
- time.sleep(_delay)
+ # time.sleep(_delay)
else:
break
@@ -261,7 +261,7 @@ def get(self, block=True, timeout=None, *,
except QueueEmpty:
if timeout is not None and time.time() >= end:
raise # re-raise
- time.sleep(_delay)
+ # time.sleep(_delay)
else:
break
if unboundop is not None:
diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c
index 417c5fbcee2645..3d83d013b346ec 100644
--- a/Modules/_interpqueuesmodule.c
+++ b/Modules/_interpqueuesmodule.c
@@ -7,6 +7,7 @@
#include "Python.h"
#include "pycore_crossinterp.h" // _PyXIData_t
+#include "pycore_lock.h" // PyEvent
#define REGISTERS_HEAP_TYPES
#define HAS_FALLBACK
@@ -532,6 +533,8 @@ typedef struct _queue {
xidata_fallback_t fallback;
int unboundop;
} defaults;
+ PyEvent queue_space_available;
+ PyEvent queue_has_item;
} _queue;
static int
@@ -549,6 +552,8 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, struct _queuedefaults defaults)
.maxsize = maxsize,
},
.defaults = defaults,
+ .queue_space_available = (PyEvent){1},
+ .queue_has_item = (PyEvent){0}
};
return 0;
}
@@ -642,6 +647,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
}
if (queue->items.count >= maxsize) {
_queue_unlock(queue);
+ queue->queue_space_available = (PyEvent){0};
return ERR_QUEUE_FULL;
}
@@ -661,6 +667,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
queue->items.last = item;
_queue_unlock(queue);
+ queue->queue_has_item = (PyEvent){1};
return 0;
}
@@ -676,6 +683,7 @@ _queue_next(_queue *queue, _PyXIData_t **p_data, int *p_unboundop)
_queueitem *item = queue->items.first;
if (item == NULL) {
_queue_unlock(queue);
+ queue->queue_has_item = (PyEvent){0};
return ERR_QUEUE_EMPTY;
}
queue->items.first = item->next;
@@ -1136,6 +1144,9 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
}
assert(queue != NULL);
+ // Wait for the queue to have space
+ PyEvent_Wait(&queue->queue_space_available);
+
// Convert the object to cross-interpreter data.
_PyXIData_t *xidata = _PyXIData_New();
if (xidata == NULL) {
@@ -1182,6 +1193,9 @@ queue_get(_queues *queues, int64_t qid,
// Past this point we are responsible for releasing the mutex.
assert(queue != NULL);
+ // Wait for the queue to have some value
+ PyEvent_Wait(&queue->queue_has_item);
+
// Pop off the next item from the queue.
_PyXIData_t *data = NULL;
err = _queue_next(queue, &data, p_unboundop);
From 2cea9411fc482fb48b658329a135558e44c68423 Mon Sep 17 00:00:00 2001
From: Prithviraj Chaudhuri
Date: Wed, 7 Jan 2026 00:20:24 -0500
Subject: [PATCH 2/4] Changed event object names
---
Lib/concurrent/interpreters/_queues.py | 7 +------
Modules/_interpqueuesmodule.c | 18 +++++++++---------
2 files changed, 10 insertions(+), 15 deletions(-)
diff --git a/Lib/concurrent/interpreters/_queues.py b/Lib/concurrent/interpreters/_queues.py
index 69eb234a9c4fb7..9e765656cbafcb 100644
--- a/Lib/concurrent/interpreters/_queues.py
+++ b/Lib/concurrent/interpreters/_queues.py
@@ -172,7 +172,6 @@ def qsize(self):
def put(self, obj, block=True, timeout=None, *,
unbounditems=None,
- _delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.
@@ -226,7 +225,6 @@ def put(self, obj, block=True, timeout=None, *,
except QueueFull:
if timeout is not None and time.time() >= end:
raise # re-raise
- # time.sleep(_delay)
else:
break
@@ -237,9 +235,7 @@ def put_nowait(self, obj, *, unbounditems=None):
unboundop, = _serialize_unbound(unbounditems)
_queues.put(self._id, obj, unboundop)
- def get(self, block=True, timeout=None, *,
- _delay=10 / 1000, # 10 milliseconds
- ):
+ def get(self, block=True, timeout=None, *):
"""Return the next object from the queue.
If "block" is true, this blocks while the queue is empty.
@@ -261,7 +257,6 @@ def get(self, block=True, timeout=None, *,
except QueueEmpty:
if timeout is not None and time.time() >= end:
raise # re-raise
- # time.sleep(_delay)
else:
break
if unboundop is not None:
diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c
index 3d83d013b346ec..a905cd059651a2 100644
--- a/Modules/_interpqueuesmodule.c
+++ b/Modules/_interpqueuesmodule.c
@@ -533,8 +533,8 @@ typedef struct _queue {
xidata_fallback_t fallback;
int unboundop;
} defaults;
- PyEvent queue_space_available;
- PyEvent queue_has_item;
+ PyEvent space_available;
+ PyEvent has_item;
} _queue;
static int
@@ -552,8 +552,8 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, struct _queuedefaults defaults)
.maxsize = maxsize,
},
.defaults = defaults,
- .queue_space_available = (PyEvent){1},
- .queue_has_item = (PyEvent){0}
+ .space_available = (PyEvent){1},
+ .has_item = (PyEvent){0}
};
return 0;
}
@@ -647,7 +647,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
}
if (queue->items.count >= maxsize) {
_queue_unlock(queue);
- queue->queue_space_available = (PyEvent){0};
+ queue->space_available = (PyEvent){0};
return ERR_QUEUE_FULL;
}
@@ -667,7 +667,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
queue->items.last = item;
_queue_unlock(queue);
- queue->queue_has_item = (PyEvent){1};
+ queue->has_item = (PyEvent){1};
return 0;
}
@@ -683,7 +683,7 @@ _queue_next(_queue *queue, _PyXIData_t **p_data, int *p_unboundop)
_queueitem *item = queue->items.first;
if (item == NULL) {
_queue_unlock(queue);
- queue->queue_has_item = (PyEvent){0};
+ queue->has_item = (PyEvent){0};
return ERR_QUEUE_EMPTY;
}
queue->items.first = item->next;
@@ -1145,7 +1145,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
assert(queue != NULL);
// Wait for the queue to have space
- PyEvent_Wait(&queue->queue_space_available);
+ PyEvent_Wait(&queue->space_available);
// Convert the object to cross-interpreter data.
_PyXIData_t *xidata = _PyXIData_New();
@@ -1194,7 +1194,7 @@ queue_get(_queues *queues, int64_t qid,
assert(queue != NULL);
// Wait for the queue to have some value
- PyEvent_Wait(&queue->queue_has_item);
+ PyEvent_Wait(&queue->has_item);
// Pop off the next item from the queue.
_PyXIData_t *data = NULL;
From ecab23066a64d6a296429c9f6a3021625073a04a Mon Sep 17 00:00:00 2001
From: Prithviraj Chaudhuri
Date: Fri, 9 Jan 2026 00:39:02 -0500
Subject: [PATCH 3/4] Added block keywords queue.get and queue.put
---
Lib/concurrent/interpreters/_queues.py | 4 +--
Modules/_interpqueuesmodule.c | 22 +++++++++------
Modules/clinic/_interpqueuesmodule.c.h | 39 ++++++++++++++++++--------
3 files changed, 43 insertions(+), 22 deletions(-)
diff --git a/Lib/concurrent/interpreters/_queues.py b/Lib/concurrent/interpreters/_queues.py
index 9e765656cbafcb..bef9c2bc26a6f5 100644
--- a/Lib/concurrent/interpreters/_queues.py
+++ b/Lib/concurrent/interpreters/_queues.py
@@ -233,7 +233,7 @@ def put_nowait(self, obj, *, unbounditems=None):
unboundop = -1
else:
unboundop, = _serialize_unbound(unbounditems)
- _queues.put(self._id, obj, unboundop)
+ _queues.put(self._id, obj, unboundop, False)
def get(self, block=True, timeout=None, *):
"""Return the next object from the queue.
@@ -271,7 +271,7 @@ def get_nowait(self):
is the same as get().
"""
try:
- obj, unboundop = _queues.get(self._id)
+ obj, unboundop = _queues.get(self._id, False)
except QueueEmpty:
raise # re-raise
if unboundop is not None:
diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c
index a905cd059651a2..1197a6cb1ba0a9 100644
--- a/Modules/_interpqueuesmodule.c
+++ b/Modules/_interpqueuesmodule.c
@@ -1132,7 +1132,7 @@ queue_destroy(_queues *queues, int64_t qid)
// Push an object onto the queue.
static int
queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
- xidata_fallback_t fallback)
+ xidata_fallback_t fallback, int block)
{
PyThreadState *tstate = PyThreadState_Get();
@@ -1145,7 +1145,9 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
assert(queue != NULL);
// Wait for the queue to have space
- PyEvent_Wait(&queue->space_available);
+ if (block == 1) {
+ PyEvent_Wait(&queue->space_available);
+ }
// Convert the object to cross-interpreter data.
_PyXIData_t *xidata = _PyXIData_New();
@@ -1179,7 +1181,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
// XXX Support a "wait" mutex?
static int
queue_get(_queues *queues, int64_t qid,
- PyObject **res, int *p_unboundop)
+ PyObject **res, int *p_unboundop, int block)
{
int err;
*res = NULL;
@@ -1194,7 +1196,9 @@ queue_get(_queues *queues, int64_t qid,
assert(queue != NULL);
// Wait for the queue to have some value
- PyEvent_Wait(&queue->has_item);
+ if (block == 1) {
+ PyEvent_Wait(&queue->has_item);
+ }
// Pop off the next item from the queue.
_PyXIData_t *data = NULL;
@@ -1627,13 +1631,14 @@ _interpqueues.put
obj: object
unboundop as unboundarg: int = -1
fallback as fallbackarg: int = -1
+ block: bool = True
Add the object's data to the queue.
[clinic start generated code]*/
static PyObject *
_interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
- int unboundarg, int fallbackarg)
+ int unboundarg, int fallbackarg, int block)
/*[clinic end generated code: output=2e0b31c6eaec29c9 input=4906550ab5c73be3]*/
{
struct _queuedefaults defaults = {-1, -1};
@@ -1653,7 +1658,7 @@ _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
}
/* Queue up the object. */
- int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback);
+ int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback, block);
// This is the only place that raises QueueFull.
if (handle_queue_error(err, module, qid)) {
return NULL;
@@ -1665,6 +1670,7 @@ _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
/*[clinic input]
_interpqueues.get
qid: qidarg
+ block: bool = True
Return the (object, unbound op) from the front of the queue.
@@ -1672,12 +1678,12 @@ If there is nothing to receive then raise QueueEmpty.
[clinic start generated code]*/
static PyObject *
-_interpqueues_get_impl(PyObject *module, int64_t qid)
+_interpqueues_get_impl(PyObject *module, int64_t qid, int block)
/*[clinic end generated code: output=b0988a0e29194f05 input=c5bccbc409ad0190]*/
{
PyObject *obj = NULL;
int unboundop = 0;
- int err = queue_get(&_globals.queues, qid, &obj, &unboundop);
+ int err = queue_get(&_globals.queues, qid, &obj, &unboundop, block);
// This is the only place that raises QueueEmpty.
if (handle_queue_error(err, module, qid)) {
return NULL;
diff --git a/Modules/clinic/_interpqueuesmodule.c.h b/Modules/clinic/_interpqueuesmodule.c.h
index 3f08a0cb6d36ab..eb21267fc355a8 100644
--- a/Modules/clinic/_interpqueuesmodule.c.h
+++ b/Modules/clinic/_interpqueuesmodule.c.h
@@ -196,7 +196,7 @@ PyDoc_STRVAR(_interpqueues_put__doc__,
static PyObject *
_interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
- int unboundarg, int fallbackarg);
+ int unboundarg, int fallbackarg, int block);
static PyObject *
_interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
@@ -204,7 +204,7 @@ _interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO
PyObject *return_value = NULL;
#if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
- #define NUM_KEYWORDS 4
+ #define NUM_KEYWORDS 5
static struct {
PyGC_Head _this_is_not_used;
PyObject_VAR_HEAD
@@ -213,7 +213,7 @@ _interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO
} _kwtuple = {
.ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
.ob_hash = -1,
- .ob_item = { &_Py_ID(qid), &_Py_ID(obj), &_Py_ID(unboundop), &_Py_ID(fallback), },
+ .ob_item = { &_Py_ID(qid), &_Py_ID(obj), &_Py_ID(unboundop), &_Py_ID(fallback), &_Py_ID(block)},
};
#undef NUM_KEYWORDS
#define KWTUPLE (&_kwtuple.ob_base.ob_base)
@@ -222,7 +222,7 @@ _interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO
# define KWTUPLE NULL
#endif // !Py_BUILD_CORE
- static const char * const _keywords[] = {"qid", "obj", "unboundop", "fallback", NULL};
+ static const char * const _keywords[] = {"qid", "obj", "unboundop", "fallback", "block", NULL};
static _PyArg_Parser _parser = {
.keywords = _keywords,
.fname = "put",
@@ -235,9 +235,10 @@ _interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO
PyObject *obj;
int unboundarg = -1;
int fallbackarg = -1;
+ int block = 1;
args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser,
- /*minpos*/ 2, /*maxpos*/ 4, /*minkw*/ 0, /*varpos*/ 0, argsbuf);
+ /*minpos*/ 2, /*maxpos*/ 5, /*minkw*/ 0, /*varpos*/ 0, argsbuf);
if (!args) {
goto exit;
}
@@ -261,8 +262,14 @@ _interpqueues_put(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO
if (fallbackarg == -1 && PyErr_Occurred()) {
goto exit;
}
+ if (args[4]) {
+ block = PyLong_AsInt(args[4]);
+ if (block == 1 && PyErr_Occurred()) {
+ goto exit;
+ }
+ }
skip_optional_pos:
- return_value = _interpqueues_put_impl(module, qid, obj, unboundarg, fallbackarg);
+ return_value = _interpqueues_put_impl(module, qid, obj, unboundarg, fallbackarg, block);
exit:
return return_value;
@@ -280,7 +287,7 @@ PyDoc_STRVAR(_interpqueues_get__doc__,
{"get", _PyCFunction_CAST(_interpqueues_get), METH_FASTCALL|METH_KEYWORDS, _interpqueues_get__doc__},
static PyObject *
-_interpqueues_get_impl(PyObject *module, int64_t qid);
+_interpqueues_get_impl(PyObject *module, int64_t qid, int block);
static PyObject *
_interpqueues_get(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
@@ -288,7 +295,7 @@ _interpqueues_get(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO
PyObject *return_value = NULL;
#if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
- #define NUM_KEYWORDS 1
+ #define NUM_KEYWORDS 2
static struct {
PyGC_Head _this_is_not_used;
PyObject_VAR_HEAD
@@ -297,7 +304,7 @@ _interpqueues_get(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO
} _kwtuple = {
.ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
.ob_hash = -1,
- .ob_item = { &_Py_ID(qid), },
+ .ob_item = { &_Py_ID(qid), &_Py_ID(block)},
};
#undef NUM_KEYWORDS
#define KWTUPLE (&_kwtuple.ob_base.ob_base)
@@ -306,7 +313,7 @@ _interpqueues_get(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO
# define KWTUPLE NULL
#endif // !Py_BUILD_CORE
- static const char * const _keywords[] = {"qid", NULL};
+ static const char * const _keywords[] = {"qid", "block", NULL};
static _PyArg_Parser _parser = {
.keywords = _keywords,
.fname = "get",
@@ -315,16 +322,24 @@ _interpqueues_get(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyO
#undef KWTUPLE
PyObject *argsbuf[1];
int64_t qid;
+ int block = 1;
args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser,
- /*minpos*/ 1, /*maxpos*/ 1, /*minkw*/ 0, /*varpos*/ 0, argsbuf);
+ /*minpos*/ 1, /*maxpos*/ 2, /*minkw*/ 0, /*varpos*/ 0, argsbuf);
if (!args) {
goto exit;
}
if (!qidarg_converter(args[0], &qid)) {
goto exit;
}
- return_value = _interpqueues_get_impl(module, qid);
+
+ if (args[1]) {
+ block = PyLong_AsInt(args[1]);
+ if (block == 1 && PyErr_Occurred()) {
+ goto exit;
+ }
+ }
+ return_value = _interpqueues_get_impl(module, qid, block);
exit:
return return_value;
From 8bda61c554a94d32acfb76ad46b2cf9865397780 Mon Sep 17 00:00:00 2001
From: Prithviraj Chaudhuri
Date: Fri, 9 Jan 2026 01:12:30 -0500
Subject: [PATCH 4/4] Updated queue arguments
---
Lib/concurrent/interpreters/_queues.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/Lib/concurrent/interpreters/_queues.py b/Lib/concurrent/interpreters/_queues.py
index bef9c2bc26a6f5..0272fcd66dbac9 100644
--- a/Lib/concurrent/interpreters/_queues.py
+++ b/Lib/concurrent/interpreters/_queues.py
@@ -235,7 +235,7 @@ def put_nowait(self, obj, *, unbounditems=None):
unboundop, = _serialize_unbound(unbounditems)
_queues.put(self._id, obj, unboundop, False)
- def get(self, block=True, timeout=None, *):
+ def get(self, block=True, timeout=None):
"""Return the next object from the queue.
If "block" is true, this blocks while the queue is empty.