MongoDB 架构文档
模块路径: FQBase.DataStore.mongo_db源码: [mongo_db.py](file:///Users/A.D.189/FQuant/FQuant.Server/FQBase/FQBase/DataStore/mongo_db.py)
一、整体架构
┌─────────────────────────────────────────────────────────────────┐
│ 应用层代码 │
│ db.insert_one("users", {"name": "test"}) │
│ db.find("users", {"age": {"$gte": 18}}) │
│ df = db.find_as_dataframe("users", query) │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ MongoDB │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ CRUD 操作 │ │
│ │ insert_one / insert_many │ │
│ │ find / find_one / find_by_id │ │
│ │ update_one / update_many / upsert │ │
│ │ delete_one / delete_many │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 查询操作 │ │
│ │ find_as_dataframe / find_by_page / count / exists │ │
│ │ distinct / aggregate / group │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 索引操作 │ │
│ │ create_index / create_indexes │ │
│ │ list_indexes / drop_index / drop_all_indexes │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 管理操作 │ │
│ │ create_collection / drop_collection / rename_collection│ │
│ │ get_server_status / clear_cache / compact_database │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 性能分析 │ │
│ │ find_with_profiling / aggregate_with_profiling │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 事务支持 │ │
│ │ with_transaction / bulk_write │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ MongoClientManager │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 连接池管理 │ │
│ │ - max_pool_size / min_pool_size │ │
│ │ - server_selection_timeout_ms │ │
│ │ - connect_timeout_ms / socket_timeout_ms │ │
│ │ - 自动重连 │ │
│ │ - 健康检查 (ping) │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ MongoDB Server │
│ (Connection Pool) │
└─────────────────────────────────────────────────────────────────┘二、组件架构
mongo_db.py
│
├── 核心类
│ └── MongoDB (单例)
│ ├── _uri: str # MongoDB URI
│ ├── _database_name: str # 数据库名称
│ ├── _client_manager # MongoClientManager
│ ├── _db: Database # 数据库实例
│ ├── _connected: bool # 连接状态
│ ├── _lock: threading.Lock # 线程锁
│ └── _config # 配置对象
│
├── CRUD 操作
│ ├── insert_one() → 插入单文档
│ ├── insert_many() → 批量插入
│ ├── find() → 查询列表
│ ├── find_one() → 查询单文档
│ ├── find_by_id() → 根据ID查询
│ ├── update_one() → 更新单文档
│ ├── update_many() → 批量更新
│ ├── upsert() → 更新或插入
│ ├── find_one_and_update() → 原子查询更新
│ ├── delete_one() → 删除单文档
│ └── delete_many() → 批量删除
│
├── 查询操作
│ ├── find_as_dataframe() → 返回 DataFrame
│ ├── find_by_page() → 分页查询
│ ├── count() → 统计数量
│ ├── exists() → 检查存在
│ ├── distinct() → 去重值
│ ├── aggregate() → 聚合管道
│ └── group() → 分组统计
│
├── 索引操作
│ ├── create_index() → 创建索引
│ ├── create_indexes() → 批量创建
│ ├── list_indexes() → 列出索引
│ ├── drop_index() → 删除索引
│ └── drop_all_indexes() → 删除所有索引
│
├── 集合操作
│ ├── list_collections() → 列出集合
│ ├── create_collection() → 创建集合
│ ├── drop_collection() → 删除集合
│ ├── rename_collection() → 重命名集合
│ └── collection_stats() → 集合统计
│
├── 数据库操作
│ ├── command() → 执行命令
│ ├── get_database_stats() → 数据库统计
│ ├── get_server_status() → 服务器状态
│ ├── clear_cache() → 清除缓存
│ ├── compact_database() → 压缩数据库
│ ├── rotate_logs() → 轮转日志
│ └── ping() → 健康检查
│
├── 性能分析
│ ├── find_with_profiling() → 带性能分析查询
│ └── aggregate_with_profiling() → 带性能分析聚合
│
└── 事务支持
├── with_transaction() → 事务操作
└── bulk_write() → 批量写入三、工作流程
3.1 查询流程
db.find("users", {"age": {"$gte": 18}}, sort=[("name", 1)], limit=10)
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 1. 检查连接 │
│ _ensure_connected() │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 2. 获取集合 │
│ _get_collection("users") │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 3. 执行查询 │
│ cursor = collection.find(query, projection) │
│ cursor = cursor.sort(sort).skip(skip).limit(limit) │
│ return list(cursor) │
└───────────────────────────────────────────────────────────────┘3.2 插入流程
db.insert_one("users", {"name": "test", "age": 25})
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 1. 检查连接 │
│ _ensure_connected() │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 2. 插入文档 │
│ result = collection.insert_one(document) │
│ return str(result.inserted_id) │
└───────────────────────────────────────────────────────────────┘3.3 DataFrame 流程
db.find_as_dataframe("stocks", {"date": {"$gte": "2024-01-01"}})
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 1. 执行 find() 查询 │
│ results = self.find(...) │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 2. 转为 DataFrame │
│ df = pd.DataFrame(results) │
│ df['_id'] = df['_id'].astype(str) # ObjectId 转字符串 │
└───────────────────────────────────────────────────────────────┘3.4 事务流程
db.with_transaction(operations)
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 1. 启动会话 │
│ with client.start_session() as session: │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 2. 开启事务 │
│ with session.start_transaction(): │
│ result = operations(self) │
└───────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 3. 提交/回滚 │
│ 成功自动提交,异常自动回滚 │
└───────────────────────────────────────────────────────────────┘