-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path__init__.py
More file actions
71 lines (61 loc) · 2.19 KB
/
__init__.py
File metadata and controls
71 lines (61 loc) · 2.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""
Dispatch
========
Contains factory method for selecting dispatcher type based on Simvue Configuration
"""
import typing
import logging
if typing.TYPE_CHECKING:
from .base import DispatcherBaseClass
from threading import Event
from .queued import QueuedDispatcher
from .direct import DirectDispatcher
logger = logging.getLogger(__name__)
def Dispatcher(
mode: typing.Literal["direct", "queued"],
callback: typing.Callable[[list[typing.Any], str, dict[str, typing.Any]], None],
object_types: list[str],
termination_trigger: "Event",
name: str | None = None,
**kwargs,
) -> "DispatcherBaseClass":
"""Returns instance of dispatcher based on configuration
Options are 'queued' which is the default and adds objects to a queue as well
as restricts the rate of dispatch, and 'prompt' which executes the callback
immediately
Parameters
----------
mode : typing.Literal['prompt', 'queued']
dispatcher mode
* prompt - execute callback immediately, do not queue.
* queue - execute callback on entries in a queue.
callback : typing.Callable[[list[typing.Any], str, dict[str, typing.Any]], None]
callback to be executed on each item provided
object_types : list[str]
categories, this is mainly used for creation of queues in a QueueDispatcher
termination_trigger : Event
event which triggers termination of the dispatcher
name : str | None, optional
name for the underlying thread, default None
Returns
-------
DispatcherBaseClass
either a DirectDispatcher or QueueDispatcher instance
"""
if mode == "direct":
logger.debug("Using direct dispatch for metric and queue sending")
return DirectDispatcher(
callback=callback,
object_types=object_types,
termination_trigger=termination_trigger,
**kwargs,
)
else:
logger.debug("Using queued dispatch for metric and queue sending")
return QueuedDispatcher(
callback=callback,
object_types=object_types,
termination_trigger=termination_trigger,
name=name,
**kwargs,
)