Skip to content

EventBus API 参考

模块路径: FQBase.Core.event_bus源码: [event_bus.py](file:///Users/A.D.189/FQuant/FQuant.Server/FQBase/FQBase/Core/event_bus.py)


一、EventBus 类

EventBus.__init__(max_history: int = 100)

初始化事件总线。

参数类型默认值说明
max_historyint100最大历史记录数

EventBus.subscribe(event_type: str, callback: Callable, priority: int = 0, weak_ref: bool = False) -> str

订阅事件。

参数类型默认值说明
event_typestr-事件类型
callbackCallable-回调函数
priorityint0优先级,数值越大越先执行
weak_refboolFalse是否使用弱引用

返回值: 订阅ID,用于取消订阅

EventBus.unsubscribe(event_type: str, callback: Callable) -> None

取消订阅。

EventBus.unsubscribe_by_id(subscriber_id: str) -> bool

通过订阅ID取消订阅。

参数类型说明
subscriber_idstr订阅ID

返回值: 是否成功取消

EventBus.subscribe_global(callback: Callable, priority: int = 0, weak_ref: bool = False) -> str

订阅所有事件。

EventBus.unsubscribe_global(callback: Callable) -> None

取消全局订阅。

EventBus.publish(event: Event) -> None

同步发布事件。调用线程会被阻塞,等待所有订阅者回调执行完毕后才会返回。

特点: 立即发送消息,同步阻塞等待回调执行完成。

EventBus.publish_async(event: Event, callback: Optional[Callable] = None) -> asyncio.Future

异步发布事件。调用后立即返回,不阻塞当前线程,事件发送和回调执行在后台线程池中进行。

参数类型说明
eventEvent事件对象
callbackOptional[Callable]完成回调

返回值: asyncio.Future 对象

特点: 立即发送消息,非阻塞立即返回。

EventBus.publishAwait(event: Event) -> None

异步等待发布事件(async/await)。使用 asyncio 并行执行所有订阅者回调。

特点: 立即发送消息,异步并行执行回调,需要在 async 上下文中使用 await 调用。

三种发布方式对比

特性publish()publish_async()publishAwait()
发送时机立即发送立即发送立即发送
是否阻塞同步阻塞,等待回调执行完非阻塞,立即返回异步等待,需要 await
执行方式在当前线程顺序执行后台线程池并行执行asyncio 并行执行
返回值Noneasyncio.FutureNone
适用场景同步流程、需要确认回调完成不阻塞主流程、耗时操作async/await 异步上下文

EventBus.get_history(event_type: str = None, limit: int = 100) -> List[Event]

获取事件历史。

参数类型默认值说明
event_typestrNone事件类型过滤
limitint100返回数量限制

返回值: 事件列表

EventBus.clear_history() -> None

清除事件历史。

EventBus.get_subscriber_count(event_type: str = None) -> int

获取订阅者数量。

EventBus.get_subscribers(event_type: str = None) -> List[Dict[str, Any]]

获取订阅者列表。

EventBus.cleanup() -> int

清理失效的弱引用订阅者。

返回值: 清理的订阅者数量


二、Event 类

python
class Event:
    def __init__(self, event_type: str, data: Any = None, **kwargs):
        self.event_type = event_type
        self.name = event_type
        self.data = data
        self.timestamp = datetime.now()
        self.extra = kwargs
属性类型说明
event_typestr事件类型
namestr事件名称(等同于 event_type)
dataAny事件数据
timestampdatetime事件时间戳
extradict额外参数

三、便捷函数

python
def get_event_bus() -> EventBus:
    """获取事件总线单例实例"""
    return EventBus()

四、环境变量

EventBus 模块本身不依赖环境变量。


五、相关文档

文档说明
framework.md框架概述
architecture.md架构设计
design.md设计决策
usage.md使用指南
best-practices.md最佳实践
celery/api.mdEventBus Celery API