MongoDB 使用指南
模块路径: FQBase.DataStore.mongo_db源码: [mongo_db.py](file:///Users/A.D.189/FQuant/FQuant.Server/FQBase/FQBase/DataStore/mongo_db.py)
一、基本使用
1.1 初始化连接
python
from FQBase.DataStore import MongoDB, get_mongo_db
db = MongoDB(database="mydb")
db = get_mongo_db(database="mydb")1.2 上下文管理器
python
with MongoDB(database="mydb") as db:
db.insert_one("users", {"name": "test", "age": 25})1.3 配置连接
python
db = MongoDB(
uri="mongodb://localhost:27017",
database="mydb",
username="admin",
password="password",
max_pool_size=50,
min_pool_size=10
)二、CRUD 操作
2.1 插入
python
db = get_mongo_db()
db.insert_one("users", {"name": "test", "age": 25})
users = [
{"name": "user1", "age": 20},
{"name": "user2", "age": 30},
]
db.insert_many("users", users)2.2 查询
python
all_users = db.find("users")
young_users = db.find("users", {"age": {"$gte": 18}})
user = db.find_one("users", {"name": "test"})
user_by_id = db.find_by_id("users", "507f1f77bcf86cd799439011")
exists = db.exists("users", {"name": "test"})2.3 更新
python
db.update_one("users", {"name": "test"}, {"$set": {"age": 30}})
db.update_many("users", {"age": {"$lt": 18}}, {"$set": {"status": "minor"}})
db.upsert("users", {"name": "newuser"}, {"$set": {"name": "newuser", "age": 25}})2.4 删除
python
db.delete_one("users", {"name": "test"})
db.delete_many("users", {"age": {"$lt": 18}})三、查询操作
3.1 条件查询
python
db.find("users", {"age": {"$gte": 18, "$lte": 65}})
db.find("users", {"name": {"$regex": "^张"}})
db.find("users", {"tags": {"$in": ["vip", "active"]}})
db.find("users", {"$or": [{"age": {"$lt": 18}}, {"status": "inactive"}]})3.2 字段投影
python
db.find("users", {}, projection={"name": 1, "age": 1})
db.find("users", {}, projection={"password": 0})
db.find("users", {"_id": 0, "name": 1})3.3 排序分页
python
from pymongo import ASCENDING, DESCENDING
db.find("users", sort=[("age", ASCENDING)])
db.find("users", sort=[("age", DESCENDING), ("name", ASCENDING)])
db.find("users", skip=10, limit=20)3.4 分页查询
python
result = db.find_by_page("users", page=1, page_size=20)
print(result['data'])
print(f"Total: {result['total']}, Pages: {result['total_pages']}")3.5 DataFrame
python
df = db.find_as_dataframe("stocks", {"date": {"$gte": "2024-01-01"}})
print(df.head())
df.to_csv("stocks.csv")四、聚合查询
4.1 基础聚合
python
pipeline = [
{"$match": {"status": "active"}},
{"$group": {"_id": "$department", "count": {"$sum": 1}}},
{"$sort": {"count": -1}}
]
result = db.aggregate("employees", pipeline)4.2 多阶段聚合
python
pipeline = [
{"$match": {"date": {"$gte": "2024-01-01"}}},
{"$group": {"_id": "$code", "avg_price": {"$avg": "$price"}, "total_volume": {"$sum": "$volume"}}},
{"$sort": {"total_volume": -1}},
{"$limit": 10}
]
top_stocks = db.aggregate("stock_daily", pipeline)4.3 去重统计
python
departments = db.distinct("employees", "department")
active_departments = db.distinct("employees", "department", {"status": "active"})五、索引管理
5.1 创建索引
python
from pymongo import ASCENDING, DESCENDING
db.create_index("users", "name", unique=True)
db.create_index("users", [("age", ASCENDING), ("name", DESCENDING)])5.2 批量创建
python
indexes = [
{"keys": [("code", ASCENDING), ("date", DESCENDING)]},
{"keys": "created_at", "unique": False},
{"keys": [("status", ASCENDING), ("priority", DESCENDING)], "unique": False}
]
db.create_indexes("tasks", indexes)5.3 索引操作
python
indexes = db.list_indexes("users")
for idx in indexes:
print(idx['name'], idx['key'])
db.drop_index("users", "name_1")六、事务操作
6.1 转账示例
python
def transfer_funds(db, from_id, to_id, amount):
db.update_one("accounts", {"_id": from_id}, {"$inc": {"balance": -amount}})
db.update_one("accounts", {"_id": to_id}, {"$inc": {"balance": amount}})
return True
result = db.with_transaction(
lambda db: transfer_funds(db, "A", "B", 100)
)6.2 批量写入
python
operations = [
{"type": "insert_one", "document": {"name": "user1", "age": 20}},
{"type": "update_one", "query": {"name": "user2"}, "update": {"$set": {"age": 30}}},
{"type": "delete_one", "query": {"name": "user3"}},
]
result = db.bulk_write("users", operations)
print(result)七、量化交易场景
7.1 存储股票数据
python
db.insert_one("stock_daily", {
"code": "000001",
"date": "2024-01-15",
"open": 12.50,
"high": 13.00,
"low": 12.30,
"close": 12.80,
"volume": 1000000
})7.2 查询股票数据
python
df = db.find_as_dataframe("stock_daily", {
"code": "000001",
"date": {"$gte": "2024-01-01", "$lte": "2024-12-31"}
}, sort=[("date", ASCENDING)])7.3 计算日收益率
python
pipeline = [
{"$match": {"code": "000001", "date": {"$gte": "2024-01-01"}}},
{"$sort": {"date": 1}},
{"$project": {"code": 1, "date": 1, "close": 1, "prev_close": {"$shift": {"output": "$close", "by": -1, "default": None}}}},
{"$addFields": {"return": {"$cond": [{"ne": ["$prev_close", None]}, {"$divide": [{"$subtract": ["$close", "$prev_close"]}, "$prev_close"]}, None]}}},
]
result = db.aggregate("stock_daily", pipeline)7.4 存储回测结果
python
db.insert_one("backtest_results", {
"strategy": "均值回归",
"start_date": "2024-01-01",
"end_date": "2024-12-31",
"total_return": 0.156,
"sharpe_ratio": 2.1,
"max_drawdown": 0.083,
"trades": [...]
})7.5 存储用户持仓
python
db.update_one("positions", {"user_id": "user1", "code": "000001"}, {
"$set": {"volume": 1000, "avg_price": 12.50, "updated_at": "2024-01-15"}
}, upsert=True)八、完整示例
python
from FQBase.DataStore import MongoDB, get_mongo_db
from pymongo import ASCENDING
class StockDataManager:
def __init__(self):
self.db = get_mongo_db(database="quant")
def save_daily_data(self, code, date, data):
doc = {
"code": code,
"date": date,
**data
}
self.db.upsert("stock_daily", {"code": code, "date": date}, {"$set": doc})
def get_stock_data(self, code, start_date, end_date):
return self.db.find_as_dataframe("stock_daily", {
"code": code,
"date": {"$gte": start_date, "$lte": end_date}
}, sort=[("date", ASCENDING)])
def get_top_volume(self, date, limit=10):
pipeline = [
{"$match": {"date": date}},
{"$sort": {"volume": -1}},
{"$limit": limit},
{"$project": {"code": 1, "volume": 1, "close": 1}}
]
return self.db.aggregate("stock_daily", pipeline)
manager = StockDataManager()
manager.save_daily_data("000001", "2024-01-15", {"close": 12.50, "volume": 1000000})
df = manager.get_stock_data("000001", "2024-01-01", "2024-12-31")