一个高性能,结构简洁,依赖于 Python内置库asyncio 的事件系统, 设计灵感来自Graia BroadcastControl。
项目仍处于开发阶段,部分内容可能会有较大改变
pip install arclet-letodereaimport asyncio
import arclet.letoderea as le
@le.make_event
class TestEvent:
name: str
@le.on_global
async def test_subscriber(name: str):
print(name)
async def main():
await le.publish(TestEvent("Letoderea"))
asyncio.run(main())import asyncio
import arclet.letoderea as le
@le.make_event
class TestEvent:
name: str
@le.depends()
async def get_msg(event):
return f"Hello, {event.name}"
@le.on(TestEvent)
async def test_subscriber(msg: str = get_msg):
print(msg)
async def main():
await le.publish(TestEvent("Letoderea"))
asyncio.run(main())import asyncio
import random
import arclet.letoderea as le
@le.make_event
class Event:
name: str
@le.make_event(name="rand")
class RandomData:
seed: int
def check_result(self, value) -> le.Result[float] | None: ...
@le.on(RandomData)
def random_subscriber(seed: int):
return random.Random(seed).random()
@le.on(Event)
async def event_subscriber(event: Event):
print(f"Event: {event.name}")
result = await le.post(RandomData(42))
print(f"Random: {result.value}")
async def main():
await le.publish(Event("Letoderea"))
asyncio.run(main())import asyncio
import arclet.letoderea as le
@le.make_event
class Event:
name: str
flag: bool = False
@le.on_global
@le.enter_if(le.deref(Event).name == "Letoderea")
async def sub_if_letoderea():
...
@le.on(Event).if_(le.deref(Event).flag & (le.deref(Event).name != "Letoderea"))
async def sub_if_not_letoderea():
...
async def main():
await le.publish(Event("Letoderea"))
await le.publish(Event("OtherEvent", True))
asyncio.run(main())- 事件(Event):被发布并分发给订阅者的对象。
- 订阅者(Subscriber):注册到事件系统中的函数或协程函数。
- 上下文(Contexts):单次分发过程中的共享数据容器(
dict子类)。 - Provider:参数注入器,在调用订阅者前解析参数。
- 作用域(Scope):订阅关系的容器,用于隔离和组织分发流程。
- 事件可以是任意对象,满足以下任一条件即可参与分发:
- 实现
gather(contexts)异步方法; - 通过
define(..., supplier=...)提供采集逻辑; - 通过
@gather注册 supplier 方法。
- 实现
gather用于将事件相关数据写入Contexts。- 事件可携带
Provider,用于为订阅者参数提供注入能力。 - 订阅子类事件时,父类事件的
Provider会被继承。 - 订阅父类事件时,子类事件同样会被分发给该订阅者。
- 装饰器流派:通过
on、use、on_global将函数注册为订阅者。 - 显式调用流派:通过
Scope.register完成注册。 - 上述方式都会返回
Subscriber,可通过.dispose()取消订阅。 - 订阅者支持优先级,值越小优先级越高。
- 订阅者参数可为任意类型,系统会尝试从
Contexts查找并注入。 - 默认情况下,参数名为
event时会注入当前事件实例。 Provider[T]负责管理参数注入流程。- 在注册阶段,系统会为每个参数遍历可用
Provider,调用Provider.validate判断是否可绑定。 - 在执行阶段,系统按绑定顺序调用
Provider.__call__;当返回值不为None时完成注入。 Depend会被作为特殊Provider处理:参数解析时直接调用其绑定方法。Provider支持优先级(值越小越高)。
Contexts是dict子类,用于在分发链路中共享数据。- 默认包含
$event(当前事件实例)与$subscriber(当前订阅者实例)。 - 订阅者执行后,返回值写入
$result。 - 参数解析异常会写入
$error,用于后续处理与调试。
publish(event):分发给所有匹配订阅者。post(event):当某个订阅者返回有效值后立即停止,并返回该值。waterfall(event):按匹配顺序串行执行订阅者,并将上一个订阅者的结果作为后续处理可用输入逐步传递。Publisher.validate用于判断事件是否由该发布者处理。Publisher.supply用于让系统主动产出事件并完成分发。use与Scope.register可指定Publisher。- 可通过
define定义发布者,并在.use等位置按名称引用。
Scope负责管理订阅者与事件的匹配和分发关系。- 所有订阅者都存储在某个
Scope中。 publish与post可指定目标Scope。
Subscriber.propagate用于注册同级传播订阅者。prepend=True时前置执行,否则在当前订阅者后执行。- 后置传播订阅者可获取上一个订阅者的返回值。
- 传播订阅者会继承当前订阅者的
Provider。 - 返回
STOP可中止同级传播。 - 若传播订阅者依赖暂未满足,系统会尝试延迟执行;若最终都无法满足,会抛出异常。
propagate支持优先级,也可接收Propagator通过compose批量提供传播订阅者。
- 订阅者执行期间发生异常时,系统会在当前
Contexts中记录$error。 - 依赖解析失败且无法通过延迟满足时,会抛出异常而非静默忽略。
- 异常可在同一分发链路中被后续逻辑感知并处理,便于统一收敛错误。
- 可通过
if_、enter_if为订阅者增加条件过滤。 - 条件表达式可基于事件字段、上下文值与组合逻辑进行匹配。
- 事件匹配除类型外,还支持更细粒度的命名或模式匹配场景,用于减少无效分发。
- 不同
Scope之间的订阅关系彼此隔离,适合模块化与测试隔离场景。 - 可在指定
Scope内调用publish、post、waterfall,实现独立分发。 - 通过传播链(前置/后置)与
STOP可实现链路短路和流程控制。