Skip to content

EventBus 最佳实践

模块路径: FQBase.Core.event_bus源码: [event_bus.py](file:///Users/A.D.189/FQuant/FQuant.Server/FQBase/FQBase/Core/event_bus.py)


一、事件命名规范

python
# 推荐:使用 领域_动作 格式
EVENTS = {
    "trade_submitted",      # 交易提交
    "trade_executed",        # 交易执行
    "trade_cancelled",       # 交易取消
    "position_opened",       # 持仓开仓
    "position_closed",       # 持仓平仓
    "risk_alert",            # 风控警告
}

二、错误处理

python
def safe_handler(event: Event):
    try:
        process_data(event.data)
    except ValueError as e:
        print(f"数据错误: {e}")
    except Exception as e:
        print(f"处理异常: {e}")
        # 不重新抛出,避免中断其他订阅者

bus.subscribe("data_update", safe_handler)

三、性能优化

python
# 推荐:避免在处理器中执行耗时操作
def slow_handler(event: Event):
    send_to_queue(event)  # 放入队列异步处理

# 推荐:耗时操作使用 publish_async
bus.publish_async(event)

四、内存管理

python
# 推荐:短期对象使用弱引用
class ShortLivedComponent:
    def on_event(self, event: Event):
        print(event.data)

component = ShortLivedComponent()
bus.subscribe("data", component.on_event, weak_ref=True)

五、优先级设计

python
PRIORITY_HIGH = 100   # 风控检查(最先执行)
PRIORITY_NORMAL = 50  # 正常业务处理
PRIORITY_LOW = 0     # 日志记录
PRIORITY_AUDIT = -1  # 审计追踪(最后执行)

bus.subscribe("trade", check_risk, priority=PRIORITY_HIGH)
bus.subscribe("trade", process_trade, priority=PRIORITY_NORMAL)
bus.subscribe("trade", log_trade, priority=PRIORITY_LOW)
bus.subscribe("trade", audit_trail, priority=PRIORITY_AUDIT)