Skip to content

MongoDB 架构文档

模块路径: FQBase.DataStore.mongo_db源码: [mongo_db.py](file:///Users/A.D.189/FQuant/FQuant.Server/FQBase/FQBase/DataStore/mongo_db.py)


一、整体架构

┌─────────────────────────────────────────────────────────────────┐
│                        应用层代码                                 │
│   db.insert_one("users", {"name": "test"})                     │
│   db.find("users", {"age": {"$gte": 18}})                      │
│   df = db.find_as_dataframe("users", query)                     │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                        MongoDB                                   │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                    CRUD 操作                               │ │
│  │   insert_one / insert_many                                │ │
│  │   find / find_one / find_by_id                           │ │
│  │   update_one / update_many / upsert                      │ │
│  │   delete_one / delete_many                               │ │
│  └───────────────────────────────────────────────────────────┘ │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                    查询操作                               │ │
│  │   find_as_dataframe / find_by_page / count / exists       │ │
│  │   distinct / aggregate / group                           │ │
│  └───────────────────────────────────────────────────────────┘ │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                    索引操作                               │ │
│  │   create_index / create_indexes                          │ │
│  │   list_indexes / drop_index / drop_all_indexes           │ │
│  └───────────────────────────────────────────────────────────┘ │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                    管理操作                               │ │
│  │   create_collection / drop_collection / rename_collection│ │
│  │   get_server_status / clear_cache / compact_database      │ │
│  └───────────────────────────────────────────────────────────┘ │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                    性能分析                               │ │
│  │   find_with_profiling / aggregate_with_profiling         │ │
│  └───────────────────────────────────────────────────────────┘ │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                    事务支持                               │ │
│  │   with_transaction / bulk_write                          │ │
│  └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                   MongoClientManager                             │
│  ┌───────────────────────────────────────────────────────────┐ │
│  │                    连接池管理                              │ │
│  │   - max_pool_size / min_pool_size                       │ │
│  │   - server_selection_timeout_ms                         │ │
│  │   - connect_timeout_ms / socket_timeout_ms               │ │
│  │   - 自动重连                                             │ │
│  │   - 健康检查 (ping)                                      │ │
│  └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                        MongoDB Server                            │
│                   (Connection Pool)                             │
└─────────────────────────────────────────────────────────────────┘

二、组件架构

mongo_db.py

├── 核心类
│   └── MongoDB (单例)
│       ├── _uri: str                    # MongoDB URI
│       ├── _database_name: str          # 数据库名称
│       ├── _client_manager             # MongoClientManager
│       ├── _db: Database               # 数据库实例
│       ├── _connected: bool            # 连接状态
│       ├── _lock: threading.Lock       # 线程锁
│       └── _config                     # 配置对象

├── CRUD 操作
│   ├── insert_one()        → 插入单文档
│   ├── insert_many()       → 批量插入
│   ├── find()              → 查询列表
│   ├── find_one()          → 查询单文档
│   ├── find_by_id()        → 根据ID查询
│   ├── update_one()         → 更新单文档
│   ├── update_many()        → 批量更新
│   ├── upsert()            → 更新或插入
│   ├── find_one_and_update() → 原子查询更新
│   ├── delete_one()         → 删除单文档
│   └── delete_many()        → 批量删除

├── 查询操作
│   ├── find_as_dataframe()  → 返回 DataFrame
│   ├── find_by_page()       → 分页查询
│   ├── count()              → 统计数量
│   ├── exists()             → 检查存在
│   ├── distinct()            → 去重值
│   ├── aggregate()           → 聚合管道
│   └── group()               → 分组统计

├── 索引操作
│   ├── create_index()       → 创建索引
│   ├── create_indexes()     → 批量创建
│   ├── list_indexes()       → 列出索引
│   ├── drop_index()         → 删除索引
│   └── drop_all_indexes()   → 删除所有索引

├── 集合操作
│   ├── list_collections()   → 列出集合
│   ├── create_collection()  → 创建集合
│   ├── drop_collection()    → 删除集合
│   ├── rename_collection()  → 重命名集合
│   └── collection_stats()   → 集合统计

├── 数据库操作
│   ├── command()            → 执行命令
│   ├── get_database_stats() → 数据库统计
│   ├── get_server_status()  → 服务器状态
│   ├── clear_cache()        → 清除缓存
│   ├── compact_database()   → 压缩数据库
│   ├── rotate_logs()       → 轮转日志
│   └── ping()               → 健康检查

├── 性能分析
│   ├── find_with_profiling()    → 带性能分析查询
│   └── aggregate_with_profiling() → 带性能分析聚合

└── 事务支持
    ├── with_transaction()   → 事务操作
    └── bulk_write()        → 批量写入

三、工作流程

3.1 查询流程

db.find("users", {"age": {"$gte": 18}}, sort=[("name", 1)], limit=10)


┌───────────────────────────────────────────────────────────────┐
│ 1. 检查连接                                                     │
│    _ensure_connected()                                         │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 2. 获取集合                                                     │
│    _get_collection("users")                                   │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 3. 执行查询                                                     │
│    cursor = collection.find(query, projection)                │
│    cursor = cursor.sort(sort).skip(skip).limit(limit)         │
│    return list(cursor)                                        │
└───────────────────────────────────────────────────────────────┘

3.2 插入流程

db.insert_one("users", {"name": "test", "age": 25})


┌───────────────────────────────────────────────────────────────┐
│ 1. 检查连接                                                     │
│    _ensure_connected()                                         │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 2. 插入文档                                                     │
│    result = collection.insert_one(document)                    │
│    return str(result.inserted_id)                             │
└───────────────────────────────────────────────────────────────┘

3.3 DataFrame 流程

db.find_as_dataframe("stocks", {"date": {"$gte": "2024-01-01"}})


┌───────────────────────────────────────────────────────────────┐
│ 1. 执行 find() 查询                                            │
│    results = self.find(...)                                   │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 2. 转为 DataFrame                                              │
│    df = pd.DataFrame(results)                                 │
│    df['_id'] = df['_id'].astype(str)  # ObjectId 转字符串   │
└───────────────────────────────────────────────────────────────┘

3.4 事务流程

db.with_transaction(operations)


┌───────────────────────────────────────────────────────────────┐
│ 1. 启动会话                                                     │
│    with client.start_session() as session:                    │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 2. 开启事务                                                     │
│    with session.start_transaction():                           │
│        result = operations(self)                               │
└───────────────────────────────────────────────────────────────┘


┌───────────────────────────────────────────────────────────────┐
│ 3. 提交/回滚                                                   │
│    成功自动提交,异常自动回滚                                    │
└───────────────────────────────────────────────────────────────┘