Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ Valid values are ``'thread'`` (default) to initialise a

app.config['EXECUTOR_TYPE'] = 'thread'

If you want to provide your own PoolExecutor class, set ``EXECUTOR_TYPE`` to ``'custom'`` and set ``EXECUTOR_POOL_CLASS`` in your app configuration::

app.config['EXECUTOR_TYPE'] = 'custom'
app.config['EXECUTOR_POOL_CLASS'] = UserPoolExecutor

Note: the user-provided PoolExecutor must implement all the relevant methods from :class:`~concurrent.futures.Executor`

To define the number of worker threads for a :class:`~concurrent.futures.ThreadPoolExecutor` or the
number of worker processes for a :class:`~concurrent.futures.ProcessPoolExecutor`, set
``EXECUTOR_MAX_WORKERS`` in your app configuration. Valid values are any integer or ``None`` (default)
Expand All @@ -54,12 +61,31 @@ to let :py:mod:`concurrent.futures` pick defaults for you::

If multiple executors are needed, :class:`flask_executor.Executor` can be initialised with a ``name``
parameter. Named executors will look for configuration variables prefixed with the specified ``name``
value, uppercased:
value, uppercased::

app.config['CUSTOM_EXECUTOR_TYPE'] = 'thread'
app.config['CUSTOM_EXECUTOR_MAX_WORKERS'] = 5
executor = Executor(app, name='custom')

gevent
^^^^^^
When using `gevent <http://www.gevent.org/>`_ the original :class:`~concurrent.futures.ThreadPoolExecutor` is being patched to behave cooperatively inside the event loop.

In some cases you want to run your workloads inside a `thread` instead of inside a `greenlet`, in such cases you want to configure your app using `gevent.threadpool.ThreadPoolExecutor <https://www.gevent.org/api/gevent.threadpool.html#gevent.threadpool.ThreadPoolExecutor>`_::

app.config['EXECUTOR_TYPE'] = 'custom'
app.config['EXECUTOR_POOL_CLASS'] = gevent.threadpool.ThreadPoolExecutor

Note: be aware that some modules do not behave correctly when they are patched and used in a submitted job when using `gevent.threadpool.ThreadPoolExecutor <https://www.gevent.org/api/gevent.threadpool.html#gevent.threadpool.ThreadPoolExecutor>`_, for example the following code will NOT work::

executor.submit(
subprocess.check_output,
['uname', '-p'],
stderr=subprocess.DEVNULL,
text=True,
)

TypeError: child watchers are only available on the default loop

Basic Usage
-----------
Expand Down
5 changes: 5 additions & 0 deletions flask_executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def __init__(self, app=None, name=''):
self.EXECUTOR_FUTURES_MAX_LENGTH = prefix + 'EXECUTOR_FUTURES_MAX_LENGTH'
self.EXECUTOR_PROPAGATE_EXCEPTIONS = prefix + 'EXECUTOR_PROPAGATE_EXCEPTIONS'
self.EXECUTOR_PUSH_APP_CONTEXT = prefix + 'EXECUTOR_PUSH_APP_CONTEXT'
self.EXECUTOR_POOL_CLASS = prefix + 'EXECUTOR_POOL_CLASS'

if app is not None:
self.init_app(app)
Expand Down Expand Up @@ -115,6 +116,10 @@ def _make_executor(self, app):
_executor = concurrent.futures.ThreadPoolExecutor
elif executor_type == 'process':
_executor = concurrent.futures.ProcessPoolExecutor
elif executor_type == 'custom':
_executor = app.config.setdefault(self.EXECUTOR_POOL_CLASS, None)
if not _executor:
raise ValueError("Missing Executor pool class")
else:
raise ValueError("{} is not a valid executor type.".format(executor_type))
return _executor(max_workers=executor_max_workers)
Expand Down
5 changes: 4 additions & 1 deletion flask_executor/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ def __getattribute__(self, attr):
if attr in cls_dict or attr in inst_dict or attr in super_cls_dict:
return object.__getattribute__(self, attr)
target_obj = object.__getattribute__(self, PROXIED_OBJECT)
return object.__getattribute__(target_obj, attr)
try:
return object.__getattribute__(target_obj, attr)
except AttributeError:
return getattr(target_obj, attr)

def __repr__(self):
class_name = object.__getattribute__(self, '__class__').__name__
Expand Down
33 changes: 33 additions & 0 deletions tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import random
import time
from concurrent.futures import _base
from threading import local

import pytest
Expand Down Expand Up @@ -38,6 +39,15 @@ def fail():
print(hello)


class CustomThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
class _Future(_base.Future):
def __getattr__(self, item):
return lambda: True

def submit(self, fn, *args, **kwargs):
return self._Future()


def test_init(app):
executor = Executor(app)
assert 'executor' in app.extensions
Expand Down Expand Up @@ -67,6 +77,20 @@ def test_process_executor_init(default_app):
assert isinstance(executor, concurrent.futures.ProcessPoolExecutor)


def test_custom_executor_init(default_app):
default_app.config['EXECUTOR_TYPE'] = 'custom'
default_app.config['EXECUTOR_POOL_CLASS'] = CustomThreadPoolExecutor
executor = Executor(default_app)
assert isinstance(executor._self, CustomThreadPoolExecutor)
assert isinstance(executor, CustomThreadPoolExecutor)


def test_invalid_process_custom_init(default_app):
default_app.config['EXECUTOR_TYPE'] = 'custom'
with pytest.raises(ValueError):
_ = Executor(default_app)


def test_default_executor_init(default_app):
executor = Executor(default_app)
assert isinstance(executor._self, concurrent.futures.ThreadPoolExecutor)
Expand Down Expand Up @@ -319,6 +343,15 @@ def decorated(n):
assert future.result() == fib(5)


def test_custom_executor_getarrt(default_app):
default_app.config['EXECUTOR_TYPE'] = 'custom'
default_app.config['EXECUTOR_POOL_CLASS'] = CustomThreadPoolExecutor
executor = Executor(default_app)
with default_app.test_request_context(''):
executor.submit_stored('fibonacci', fib, 35)
assert executor.futures.custom_func('fibonacci')


thread_local = local()


Expand Down