Skip to content

MongoClientManager 架构文档

模块路径: FQBase.DataStore.mongo_client源码: [mongo_client.py](file:///Users/A.D.189/FQuant/FQuant.Server/FQBase/FQBase/DataStore/mongo_client.py)


一、整体架构

┌─────────────────────────────────────────────────────────────────┐
│                        应用层代码                                 │
│   get_mongo_client_manager("mongodb://...")                      │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                   MongoClientManager                              │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                   类级别组件                               │ │
│  │   _instances: Dict[str, MongoClientManager]  # 实例缓存    │ │
│  │   _lock: threading.Lock                    # 创建锁         │ │
│  │   _ref_counts: Dict[str, int]               # 引用计数     │ │
│  │   _cleanup_registered: bool                # atexit 已注册  │ │
│  └───────────────────────────────────────────────────────────┘ │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                   实例级别组件                           │ │
│  │   _uri: str                              # MongoDB URI  │ │
│  │   _max_pool_size: int                     # 最大连接池    │ │
│  │   _client: Optional[MongoClient]          # 客户端实例    │ │
│  │   _client_lock: threading.Lock             # 客户端锁     │ │
│  │   _initialized: bool                     # 初始化标志   │ │
│  └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                        MongoClient                                │
│                   (PyMongo MongoClient)                          │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                   连接池配置                              │ │
│  │   maxPoolSize: 50                                        │ │
│  │   minPoolSize: 10                                        │ │
│  │   serverSelectionTimeoutMS: 5000                          │ │
│  │   connectTimeoutMS: 5000                                   │ │
│  │   socketTimeoutMS: 30000                                   │ │
│  └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                       MongoDB Server                              │
└─────────────────────────────────────────────────────────────────┘

二、组件架构

mongo_client.py

├── 核心类
│   └── MongoClientManager
│       ├── 类级别
│       │   ├── _instances: Dict[str, MongoClientManager]
│       │   ├── _lock: threading.Lock
│       │   ├── _ref_counts: Dict[str, int]
│       │   └── _cleanup_registered: bool
│       │
│       ├── 实例级别
│       │   ├── _uri: str
│       │   ├── _max_pool_size: int
│       │   ├── _min_pool_size: int
│       │   ├── _server_selection_timeout_ms: int
│       │   ├── _connect_timeout_ms: int
│       │   ├── _socket_timeout_ms: int
│       │   ├── _client: Optional[MongoClient]
│       │   ├── _client_lock: threading.Lock
│       │   ├── _initialized: bool
│       │   └── _connect_time: int
│       │
│       └── 方法
│           ├── __new__()          # 创建实例(单例逻辑)
│           ├── __init__()          # 初始化实例
│           ├── _create_client()    # 创建客户端(带重试)
│           ├── client @property    # 获取客户端(延迟初始化)
│           ├── ping()             # Ping 检查
│           ├── is_connected()     # 连接状态
│           ├── get_pool_stats()    # 连接池统计
│           ├── health_check_detailed()  # 详细健康检查
│           ├── close()             # 关闭连接
│           ├── reset_client()      # 重置连接
│           ├── clear_all()         # 清除所有实例
│           ├── release()           # 释放引用
│           ├── get_instance_count()    # 获取实例数
│           ├── get_ref_count()     # 获取引用计数
│           └── _cleanup_at_exit()   # 退出时清理

└── 便捷函数
    └── get_mongo_client_manager()  # 获取管理器实例

三、创建流程

MongoClientManager(uri, max_pool_size=50, ...)


┌───────────────────────────────────────────────────────────────┐
│ 1. 生成唯一 Key                                               │
│    key = f"{uri}:{max_pool_size}:{min_pool_size}:..."      │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 2. 获取类锁                                                   │
│    with cls._lock:                                           │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 3. 检查实例是否存在                                           │
│    if key not in cls._instances:                             │
│        创建新实例                                             │
│    cls._ref_counts[key] += 1                                │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 4. 返回实例                                                   │
│    return cls._instances[key]                                │
└───────────────────────────────────────────────────────────────┘

四、连接获取流程

manager.client


┌───────────────────────────────────────────────────────────────┐
│ 1. 获取客户端锁                                               │
│    with self._client_lock:                                   │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 2. 检查客户端是否已创建                                       │
│    if self._client is None:                                  │
│        self._client = self._create_client()                │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 3. 返回客户端                                                 │
│    return self._client                                       │
└───────────────────────────────────────────────────────────────┘

五、引用计数流程

┌─────────────────────────────────────────────────────────────────┐
│                     引用计数管理                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  创建实例                                                      │
│  ┌─────────────────────────────────────────────────────────┐  │
│  │  __new__() 执行                                          │  │
│  │  cls._ref_counts[key] += 1                               │  │
│  └─────────────────────────────────────────────────────────┘  │
│                              │                                 │
│                              ▼                                 │
│  释放实例                                                      │
│  ┌─────────────────────────────────────────────────────────┐  │
│  │  release(uri, ...) 执行                                   │  │
│  │  cls._ref_counts[key] -= 1                               │  │
│  │  if cls._ref_counts[key] <= 0:                           │  │
│  │      instance.close()                                     │  │
│  │      del cls._instances[key]                             │  │
│  │      del cls._ref_counts[key]                            │  │
│  └─────────────────────────────────────────────────────────┘  │
│                              │                                 │
│                              ▼                                 │
│  程序退出                                                      │
│  ┌─────────────────────────────────────────────────────────┐  │
│  │  atexit.register(_cleanup_at_exit)                       │  │
│  │  → clear_all() → 关闭所有实例                            │  │
│  └─────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

六、健康检查流程

manager.health_check_detailed()


┌───────────────────────────────────────────────────────────────┐
│ 1. 检查客户端是否初始化                                        │
│    if self._client is None:                                  │
│        return {'errors': ['MongoDB client not initialized']} │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 2. 执行 ping 并测量延迟                                       │
│    start = time.time()                                       │
│    self._client.admin.command('ping')                        │
│    result['latency_ms'] = (time.time() - start) * 1000       │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 3. 获取服务器信息                                              │
│    result['server_info'] = self._client.server_info()        │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 4. 返回健康状态                                                │
│    {                                                          │
│        'healthy': True/False,                                │
│        'connected': True/False,                               │
│        'latency_ms': 12.5,                                    │
│        'server_info': {...},                                  │
│        'errors': []                                           │
│    }                                                          │
└───────────────────────────────────────────────────────────────┘