Skip to content

MongoDB API 参考

模块路径: FQBase.DataStore.mongo_db源码: [mongo_db.py](file:///Users/A.D.189/FQuant/FQuant.Server/FQBase/FQBase/DataStore/mongo_db.py)


一、MongoDB 类

MongoDB.__init__(config=None, *, uri=None, database='quantaxis', username=None, password=None, max_pool_size=50, min_pool_size=10, server_selection_timeout_ms=5000, connect_timeout_ms=5000, socket_timeout_ms=30000)

初始化 MongoDB 操作类。

参数类型默认值说明
configMongoDBConfigProtocol/dict/NoneNone配置对象或字典
uristrNoneMongoDB 连接 URI
databasestr"quantaxis"默认数据库名
usernamestrNone用户名
passwordstrNone密码
max_pool_sizeint50最大连接池大小
min_pool_sizeint10最小连接池大小
server_selection_timeout_msint5000服务器选择超时(毫秒)
connect_timeout_msint5000连接超时(毫秒)
socket_timeout_msint30000Socket 超时(毫秒)

二、属性

MongoDB.is_connected -> bool

检查是否已连接。

MongoDB.database -> Optional[Database]

获取数据库对象。

MongoDB.database_name -> str

获取数据库名称。

MongoDB.client_manager -> MongoClientManager

获取客户端管理器。


三、连接管理

MongoDB.health_check() -> bool

健康检查。

MongoDB.close() -> None

关闭连接。

MongoDB.reset() -> None

重置连接(关闭后重新建立)。

MongoDB.ping() -> bool

Ping 检查。


四、CRUD 操作

MongoDB.insert_one(collection: str, document: Dict[str, Any]) -> Optional[str]

插入单个文档。

参数类型说明
collectionstr集合名称
documentDict文档字典

返回值: 插入的文档 ID(字符串),失败返回 None

MongoDB.insert_many(collection: str, documents: List[Dict[str, Any]], ordered=False) -> List[str]

批量插入文档。

参数类型默认值说明
collectionstr-集合名称
documentsList[Dict]-文档列表
orderedboolFalse是否有序插入

返回值: 插入的文档 ID 列表

MongoDB.find(collection: str, query=None, projection=None, sort=None, skip=0, limit=0) -> List[Dict[str, Any]]

查询文档列表。

参数类型默认值说明
collectionstr-集合名称
queryDictNone查询条件
projectionDictNone返回字段
sortList[tuple]None排序条件
skipint0跳过的文档数
limitint0返回数量限制,0 表示不限制

返回值: 匹配的文档列表

MongoDB.find_one(collection: str, query: Dict[str, Any], projection=None) -> Optional[Dict[str, Any]]

查询单个文档。

参数类型说明
collectionstr集合名称
queryDict查询条件
projectionDict返回字段

返回值: 匹配的文档,未找到返回 None

MongoDB.find_by_id(collection: str, object_id: str) -> Optional[Dict[str, Any]]

根据 ID 查询文档。

参数类型说明
collectionstr集合名称
object_idstrObjectId 字符串

返回值: 匹配的文档,未找到或 ID 无效返回 None

MongoDB.find_as_dataframe(collection: str, query=None, projection=None, sort=None, skip=0, limit=0) -> pd.DataFrame

查询并返回 DataFrame。

参数类型默认值说明
collectionstr-集合名称
queryDictNone查询条件
projectionDictNone返回字段
sortList[tuple]None排序条件
skipint0跳过的文档数
limitint0返回数量限制

返回值: pandas DataFrame

MongoDB.find_by_page(collection: str, query=None, page=1, page_size=20, sort=None, projection=None) -> Dict[str, Any]

分页查询。

参数类型默认值说明
collectionstr-集合名称
queryDictNone查询条件
pageint1页码(从 1 开始)
page_sizeint20每页数量
sortList[tuple]None排序条件
projectionDictNone返回字段

返回值:

python
{
    'data': [...],      # 当前页数据
    'total': 100,       # 总数量
    'page': 1,          # 当前页
    'page_size': 20,    # 每页大小
    'total_pages': 5    # 总页数
}

MongoDB.count(collection: str, query=None) -> int

统计文档数量。

参数类型默认值说明
collectionstr-集合名称
queryDictNone查询条件

返回值: 匹配的文档数量

MongoDB.update_one(collection: str, query: Dict[str, Any], update: Dict[str, Any], upsert=False) -> int

更新单个文档。

参数类型默认值说明
collectionstr-集合名称
queryDict-查询条件
updateDict-更新操作
upsertboolFalse不存在时是否插入

返回值: 实际修改的文档数量

MongoDB.update_many(collection: str, query: Dict[str, Any], update: Dict[str, Any], upsert=False) -> int

批量更新文档。

参数类型默认值说明
collectionstr-集合名称
queryDict-查询条件
updateDict-更新操作
upsertboolFalse不存在时是否插入

返回值: 实际修改的文档数量

MongoDB.upsert(collection: str, query: Dict[str, Any], update: Dict[str, Any]) -> Optional[str]

更新或插入文档。

参数类型说明
collectionstr集合名称
queryDict查询条件
updateDict更新操作

返回值: 新插入文档的 ID(如果有),否则 None

MongoDB.find_one_and_update(collection: str, query: Dict[str, Any], update: Dict[str, Any], return_new=False) -> Optional[Dict[str, Any]]

原子操作:查询并更新。

参数类型默认值说明
collectionstr-集合名称
queryDict-查询条件
updateDict-更新操作
return_newboolFalse是否返回更新后的文档

返回值: 更新前或更新后的文档

MongoDB.delete_one(collection: str, query: Dict[str, Any]) -> int

删除单个文档。

参数类型说明
collectionstr集合名称
queryDict查询条件

返回值: 实际删除的文档数量

MongoDB.delete_many(collection: str, query: Dict[str, Any]) -> int

批量删除文档。

参数类型说明
collectionstr集合名称
queryDict查询条件

返回值: 实际删除的文档数量

MongoDB.exists(collection: str, query: Dict[str, Any]) -> bool

检查文档是否存在。

参数类型说明
collectionstr集合名称
queryDict查询条件

返回值: 存在返回 True


五、聚合操作

MongoDB.aggregate(collection: str, pipeline: List[Dict[str, Any]], allow_disk_use=True) -> List[Dict[str, Any]]

聚合管道查询。

参数类型默认值说明
collectionstr-集合名称
pipelineList[Dict]-聚合管道列表
allow_disk_useboolTrue是否允许使用磁盘

返回值: 聚合结果列表

MongoDB.distinct(collection: str, field: str, query=None) -> List[Any]

获取字段的唯一值列表。

参数类型默认值说明
collectionstr-集合名称
fieldstr-字段名
queryDictNone查询条件

返回值: 唯一值列表

MongoDB.group(collection: str, key, condition=None, initial=None, reduce=None, finalize=None) -> List[Dict[str, Any]]

分组统计。

参数类型默认值说明
collectionstr-集合名称
keystr/List/None-分组键
conditionDictNone条件
initialDictNone初始值
reducestrNonereduce 函数
finalizestrNonefinalize 函数

返回值: 分组结果列表


六、索引操作

MongoDB.create_index(collection: str, keys, unique=False, background=True, **kwargs) -> str

创建索引。

参数类型默认值说明
collectionstr-集合名称
keysstr/List[tuple]-索引键
uniqueboolFalse是否唯一索引
backgroundboolTrue是否后台创建

返回值: 索引名称

MongoDB.create_indexes(collection: str, indexes: List[Dict]) -> List[str]

批量创建索引。

参数类型说明
collectionstr集合名称
indexesList[Dict]索引定义列表

返回值: 创建的索引名称列表

MongoDB.list_indexes(collection: str) -> List[Dict[str, Any]]

列出集合的所有索引。

参数类型说明
collectionstr集合名称

返回值: 索引信息列表

MongoDB.drop_index(collection: str, index_name: str) -> bool

删除索引。

参数类型说明
collectionstr集合名称
index_namestr索引名称

返回值: 成功返回 True

MongoDB.drop_all_indexes(collection: str) -> bool

删除集合的所有索引。

参数类型说明
collectionstr集合名称

返回值: 成功返回 True


七、集合操作

MongoDB.list_collections() -> List[str]

列出所有集合名称。

返回值: 集合名称列表

MongoDB.create_collection(name: str, **kwargs) -> bool

创建集合。

参数类型说明
namestr集合名称

返回值: 成功返回 True

MongoDB.drop_collection(name: str) -> bool

删除集合。

参数类型说明
namestr集合名称

返回值: 成功返回 True

MongoDB.rename_collection(old_name: str, new_name: str) -> bool

重命名集合。

参数类型说明
old_namestr原名称
new_namestr新名称

返回值: 成功返回 True

MongoDB.collection_stats(collection: str) -> Dict[str, Any]

获取集合统计信息。

参数类型说明
collectionstr集合名称

返回值: 统计信息字典


八、数据库操作

MongoDB.command(command: Dict[str, Any]) -> Any

执行 MongoDB 命令。

参数类型说明
commandDict命令字典

返回值: 命令执行结果

MongoDB.get_database_stats() -> Dict[str, Any]

获取数据库统计信息。

返回值: 数据库统计信息

MongoDB.get_server_status() -> Dict[str, Any]

获取服务器状态信息。

返回值: 服务器状态信息

MongoDB.clear_cache() -> Dict[str, Any]

清除 MongoDB 内部缓存。

返回值:

python
{
    'success': True,
    'message': 'Cache cleared successfully',
    'details': {...}
}

MongoDB.compact_database() -> Dict[str, Any]

压缩数据库。

返回值:

python
{
    'success': True,
    'size_before_gb': 1.5,
    'size_after_gb': 1.2,
    'collections': {...},
    'skipped': ['system.indexes']
}

MongoDB.rotate_logs(keep=5) -> Dict[str, Any]

轮转日志文件。

参数类型默认值说明
keepint5保留的日志文件数量

返回值:

python
{
    'success': True,
    'rotated_at': '2024-01-15T10:30:00',
    'kept_files': [...],
    'deleted_files': [...]
}

九、性能分析

MongoDB.find_with_profiling(collection: str, query=None, projection=None, sort=None, skip=0, limit=0, slow_threshold_ms=1000) -> Dict[str, Any]

带性能分析的查询。

参数类型默认值说明
collectionstr-集合名称
queryDictNone查询条件
projectionDictNone返回字段
sortList[tuple]None排序条件
skipint0跳过的文档数
limitint0返回数量限制
slow_threshold_msint1000慢查询阈值(毫秒)

返回值:

python
{
    'data': [...],
    'metrics': {
        'elapsed_ms': 150.5,
        'is_slow': False,
        'threshold_ms': 1000,
        'result_count': 10
    }
}

MongoDB.aggregate_with_profiling(collection: str, pipeline: List[Dict], allow_disk_use=True, slow_threshold_ms=1000) -> Dict[str, Any]

带性能分析的聚合查询。


十、事务支持

MongoDB.with_transaction(operations: Callable[['MongoDB'], Any], read_concern=None, write_concern=None) -> Any

事务操作封装。

参数类型默认值说明
operationsCallable-在事务中执行的操作函数
read_concernstrNone读关注级别
write_concernstrNone写关注级别

返回值: 事务中操作的结果

MongoDB.bulk_write(collection: str, operations: List[Dict]) -> Dict[str, int]

批量写入操作。

参数类型说明
collectionstr集合名称
operationsList[Dict]操作列表

返回值:

python
{
    'inserted_count': 10,
    'modified_count': 5,
    'deleted_count': 2,
    'upserted_count': 1,
    'total': 18
}

十一、便捷函数

get_mongo_db(**kwargs) -> MongoDB

获取全局 MongoDB 单例实例。

返回值: MongoDB 实例

reset_mongo_db() -> None

重置全局 MongoDB 实例。


十二、环境变量

变量名默认值说明
MONGODB_URImongodb://localhost:27017MongoDB 连接 URI
MONGODBmongodb://localhost:27017MongoDB 连接 URI(备选)