EventBus 架构文档
模块路径: FQBase.Core.event_bus源码: [event_bus.py](file:///Users/A.D.189/FQuant/FQuant.Server/FQBase/FQBase/Core/event_bus.py)
一、整体架构
┌─────────────────────────────────────────────────────────────────┐
│ 应用层代码 │
│ 订阅事件 发布事件 │
└─────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ EventBus (单例) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 订阅者管理 │ │
│ │ _subscribers: Dict[str, List[Subscription]] │ │
│ │ _global_subscribers: List[Subscription] │ │
│ │ _subscriber_lock: threading.Lock │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 事件历史 │ │
│ │ EventHistory (环形缓冲区, maxlen=100) │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Type Subscribers│ │ Global Subscribers│
│ ─────────────── │ │ ─────────────── │
│ order_submit │ │ log_all │
│ trade_signal │ │ monitor │
│ price_change │ │ │
└─────────────────┘ └─────────────────┘二、核心组件
EventBus (单例)
├── 订阅者存储
│ ├── _subscribers: Dict[str, List[Subscription]]
│ ├── _global_subscribers: List[Subscription]
│ └── _subscriber_lock: threading.Lock
│
├── 订阅者ID管理
│ ├── _subscriber_id_counter: int
│ └── _subscriber_ids: Dict[int, tuple]
│
├── 历史记录
│ └── _history: EventHistory (环形缓冲区)
│
└── 线程池
└── _executor: ThreadPoolExecutor (max_workers=4)三、发布流程
bus.publish(Event("order", data={...}))
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 1. 添加到历史记录 │
│ _history.add(event) │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 2. 获取类型订阅者列表 (快照) │
│ type_subscribers = list(_subscribers.get(event_type, [])) │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 3. 执行类型订阅者 (按 priority 降序) │
│ for sub in type_subscribers: │
│ _invoke_callback(sub, event) │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 4. 执行全局订阅者 (按 priority 降序) │
│ global_subscribers = list(_global_subscribers) │
│ for sub in global_subscribers: │
│ _invoke_callback(sub, event) │
└───────────────────────────────────────────────────────────────┘