forked from celery/celery
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
171 lines (143 loc) · 5.77 KB
/
__init__.py
File metadata and controls
171 lines (143 loc) · 5.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""Distributed Task Queue."""
# :copyright: (c) 2016-2026 Asif Saif Uddin, celery core and individual
# contributors, All rights reserved.
# :copyright: (c) 2015-2016 Ask Solem. All rights reserved.
# :copyright: (c) 2012-2014 GoPivotal, Inc., All rights reserved.
# :copyright: (c) 2009 - 2012 Ask Solem and individual contributors,
# All rights reserved.
# :license: BSD (3 Clause), see LICENSE for more details.
import os
import re
import sys
from collections import namedtuple
# Lazy loading
from . import local
SERIES = 'dawn-chorus'
__version__ = '5.3.0b1'
__author__ = 'Ask Solem'
__contact__ = 'auvipy@gmail.com'
__homepage__ = 'https://docs.celeryq.dev/'
__docformat__ = 'restructuredtext'
__keywords__ = 'task job queue distributed messaging actor'
# -eof meta-
__all__ = (
'Celery', 'bugreport', 'shared_task', 'Task',
'current_app', 'current_task', 'maybe_signature',
'chain', 'chord', 'chunks', 'group', 'signature',
'xmap', 'xstarmap', 'uuid',
)
VERSION_BANNER = f'{__version__} ({SERIES})'
version_info_t = namedtuple('version_info_t', (
'major', 'minor', 'micro', 'releaselevel', 'serial',
))
# bumpversion can only search for {current_version}
# so we have to parse the version here.
_temp = re.match(
r'(\d+)\.(\d+).(\d+)(.+)?', __version__).groups()
VERSION = version_info = version_info_t(
int(_temp[0]), int(_temp[1]), int(_temp[2]), _temp[3] or '', '')
del _temp
del re
if os.environ.get('C_IMPDEBUG'): # pragma: no cover
import builtins
def debug_import(name, locals=None, globals=None,
fromlist=None, level=-1, real_import=builtins.__import__):
glob = globals or getattr(sys, 'emarfteg_'[::-1])(1).f_globals
importer_name = glob and glob.get('__name__') or 'unknown'
print(f'-- {importer_name} imports {name}')
return real_import(name, locals, globals, fromlist, level)
builtins.__import__ = debug_import
# This is never executed, but tricks static analyzers (PyDev, PyCharm,
# pylint, etc.) into knowing the types of these symbols, and what
# they contain.
STATICA_HACK = True
globals()['kcah_acitats'[::-1].upper()] = False
if STATICA_HACK: # pragma: no cover
from celery._state import current_app, current_task
from celery.app import shared_task
from celery.app.base import Celery
from celery.app.task import Task
from celery.app.utils import bugreport
from celery.canvas import chain, chord, chunks, group, maybe_signature, signature, subtask, xmap, xstarmap
from celery.utils import uuid
# Eventlet/gevent patching must happen before importing
# anything else, so these tools must be at top-level.
def _find_option_with_arg(argv, short_opts=None, long_opts=None):
"""Search argv for options specifying short and longopt alternatives.
Returns:
str: value for option found
Raises:
KeyError: if option not found.
"""
for i, arg in enumerate(argv):
if arg.startswith('-'):
if long_opts and arg.startswith('--'):
name, sep, val = arg.partition('=')
if name in long_opts:
return val if sep else argv[i + 1]
if short_opts and arg in short_opts:
return argv[i + 1]
raise KeyError('|'.join(short_opts or [] + long_opts or []))
def _patch_eventlet():
import eventlet.debug
eventlet.monkey_patch()
blockdetect = float(os.environ.get('EVENTLET_NOBLOCK', 0))
if blockdetect:
eventlet.debug.hub_blocking_detection(blockdetect, blockdetect)
def _patch_gevent():
import gevent.monkey
import gevent.signal
gevent.monkey.patch_all()
def maybe_patch_concurrency(argv=None, short_opts=None,
long_opts=None, patches=None):
"""Apply eventlet/gevent monkeypatches.
With short and long opt alternatives that specify the command line
option to set the pool, this makes sure that anything that needs
to be patched is completed as early as possible.
(e.g., eventlet/gevent monkey patches).
"""
argv = argv if argv else sys.argv
short_opts = short_opts if short_opts else ['-P']
long_opts = long_opts if long_opts else ['--pool']
patches = patches if patches else {'eventlet': _patch_eventlet,
'gevent': _patch_gevent}
try:
pool = _find_option_with_arg(argv, short_opts, long_opts)
except KeyError:
pass
else:
try:
patcher = patches[pool]
except KeyError:
pass
else:
patcher()
# set up eventlet/gevent environments ASAP
from celery import concurrency
if pool in concurrency.get_available_pool_names():
concurrency.get_implementation(pool)
# this just creates a new module, that imports stuff on first attribute
# access. This makes the library faster to use.
old_module, new_module = local.recreate_module( # pragma: no cover
__name__,
by_module={
'celery.app': ['Celery', 'bugreport', 'shared_task'],
'celery.app.task': ['Task'],
'celery._state': ['current_app', 'current_task'],
'celery.canvas': [
'Signature', 'chain', 'chord', 'chunks', 'group',
'signature', 'maybe_signature', 'subtask',
'xmap', 'xstarmap',
],
'celery.utils': ['uuid'],
},
__package__='celery', __file__=__file__,
__path__=__path__, __doc__=__doc__, __version__=__version__,
__author__=__author__, __contact__=__contact__,
__homepage__=__homepage__, __docformat__=__docformat__, local=local,
VERSION=VERSION, SERIES=SERIES, VERSION_BANNER=VERSION_BANNER,
version_info_t=version_info_t,
version_info=version_info,
maybe_patch_concurrency=maybe_patch_concurrency,
_find_option_with_arg=_find_option_with_arg,
)