MongoClientManager 架构文档
模块路径: FQBase.DataStore.mongo_client源码: [mongo_client.py](file:///Users/A.D.189/FQuant/FQuant.Server/FQBase/FQBase/DataStore/mongo_client.py)
一、整体架构
┌─────────────────────────────────────────────────────────────────┐
│ 应用层代码 │
│ get_mongo_client_manager("mongodb://...") │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ MongoClientManager │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 类级别组件 │ │
│ │ _instances: Dict[str, MongoClientManager] # 实例缓存 │ │
│ │ _lock: threading.Lock # 创建锁 │ │
│ │ _ref_counts: Dict[str, int] # 引用计数 │ │
│ │ _cleanup_registered: bool # atexit 已注册 │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 实例级别组件 │ │
│ │ _uri: str # MongoDB URI │ │
│ │ _max_pool_size: int # 最大连接池 │ │
│ │ _client: Optional[MongoClient] # 客户端实例 │ │
│ │ _client_lock: threading.Lock # 客户端锁 │ │
│ │ _initialized: bool # 初始化标志 │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ MongoClient │
│ (PyMongo MongoClient) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 连接池配置 │ │
│ │ maxPoolSize: 50 │ │
│ │ minPoolSize: 10 │ │
│ │ serverSelectionTimeoutMS: 5000 │ │
│ │ connectTimeoutMS: 5000 │ │
│ │ socketTimeoutMS: 30000 │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ MongoDB Server │
└─────────────────────────────────────────────────────────────────┘二、组件架构
mongo_client.py
│
├── 核心类
│ └── MongoClientManager
│ ├── 类级别
│ │ ├── _instances: Dict[str, MongoClientManager]
│ │ ├── _lock: threading.Lock
│ │ ├── _ref_counts: Dict[str, int]
│ │ └── _cleanup_registered: bool
│ │
│ ├── 实例级别
│ │ ├── _uri: str
│ │ ├── _max_pool_size: int
│ │ ├── _min_pool_size: int
│ │ ├── _server_selection_timeout_ms: int
│ │ ├── _connect_timeout_ms: int
│ │ ├── _socket_timeout_ms: int
│ │ ├── _client: Optional[MongoClient]
│ │ ├── _client_lock: threading.Lock
│ │ ├── _initialized: bool
│ │ └── _connect_time: int
│ │
│ └── 方法
│ ├── __new__() # 创建实例(单例逻辑)
│ ├── __init__() # 初始化实例
│ ├── _create_client() # 创建客户端(带重试)
│ ├── client @property # 获取客户端(延迟初始化)
│ ├── ping() # Ping 检查
│ ├── is_connected() # 连接状态
│ ├── get_pool_stats() # 连接池统计
│ ├── health_check_detailed() # 详细健康检查
│ ├── close() # 关闭连接
│ ├── reset_client() # 重置连接
│ ├── clear_all() # 清除所有实例
│ ├── release() # 释放引用
│ ├── get_instance_count() # 获取实例数
│ ├── get_ref_count() # 获取引用计数
│ └── _cleanup_at_exit() # 退出时清理
│
└── 便捷函数
└── get_mongo_client_manager() # 获取管理器实例三、创建流程
MongoClientManager(uri, max_pool_size=50, ...)
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 1. 生成唯一 Key │
│ key = f"{uri}:{max_pool_size}:{min_pool_size}:..." │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 2. 获取类锁 │
│ with cls._lock: │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 3. 检查实例是否存在 │
│ if key not in cls._instances: │
│ 创建新实例 │
│ cls._ref_counts[key] += 1 │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 4. 返回实例 │
│ return cls._instances[key] │
└───────────────────────────────────────────────────────────────┘四、连接获取流程
manager.client
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 1. 获取客户端锁 │
│ with self._client_lock: │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 2. 检查客户端是否已创建 │
│ if self._client is None: │
│ self._client = self._create_client() │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 3. 返回客户端 │
│ return self._client │
└───────────────────────────────────────────────────────────────┘五、引用计数流程
┌─────────────────────────────────────────────────────────────────┐
│ 引用计数管理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 创建实例 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ __new__() 执行 │ │
│ │ cls._ref_counts[key] += 1 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 释放实例 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ release(uri, ...) 执行 │ │
│ │ cls._ref_counts[key] -= 1 │ │
│ │ if cls._ref_counts[key] <= 0: │ │
│ │ instance.close() │ │
│ │ del cls._instances[key] │ │
│ │ del cls._ref_counts[key] │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 程序退出 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ atexit.register(_cleanup_at_exit) │ │
│ │ → clear_all() → 关闭所有实例 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘六、健康检查流程
manager.health_check_detailed()
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 1. 检查客户端是否初始化 │
│ if self._client is None: │
│ return {'errors': ['MongoDB client not initialized']} │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 2. 执行 ping 并测量延迟 │
│ start = time.time() │
│ self._client.admin.command('ping') │
│ result['latency_ms'] = (time.time() - start) * 1000 │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 3. 获取服务器信息 │
│ result['server_info'] = self._client.server_info() │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 4. 返回健康状态 │
│ { │
│ 'healthy': True/False, │
│ 'connected': True/False, │
│ 'latency_ms': 12.5, │
│ 'server_info': {...}, │
│ 'errors': [] │
│ } │
└───────────────────────────────────────────────────────────────┘