Skip to content

Cache 模块设计文档

模块路径: FQBase.Cache源码: [FQBase/Cache](file:///Users/A.D.189/FQuant/FQuant.Server/FQBase/FQBase/Cache) 版本: 2.7.0 更新日期: 2026-03-29


一、设计原则

1.1 核心设计原则

原则描述实现方式
接口抽象统一缓存操作接口CacheInterface Protocol
适配器模式支持多种后端LocalCache, RedisCacheAdapter, MongoCacheAdapter
向后兼容保持现有 API 稳定渐进式迁移策略
容错设计底层故障不影响业务静默失败,返回默认值
线程安全多线程环境安全双重锁策略

1.2 设计权衡

权衡点选择理由
LocalCache 存储OrderedDict vs dict有序支持 LRU/FIFO 驱逐
Redis 客户端单例模式避免重复连接,复用连接池
序列化格式pickle + JSON支持复杂对象 (pandas/numpy)
错误处理静默失败避免缓存故障影响主流程
TTL 精度秒级平衡精度与性能

二、接口设计

2.1 CacheInterface Protocol

python
@runtime_checkable
class CacheInterface(Protocol):
    def get(self, key: str, default: Any = None) -> Any: ...
    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: ...
    def delete(self, key: str) -> bool: ...
    def exists(self, key: str) -> bool: ...
    def clear(self) -> bool: ...
    def ttl(self, key: str) -> int: ...
    def expire(self, key: str, ttl: int) -> bool: ...
    def get_many(self, keys: List[str]) -> Dict[str, Any]: ...
    def set_many(self, mapping: Dict[str, Any], ttl: Optional[int] = None) -> bool: ...
    def delete_many(self, keys: List[str]) -> bool: ...

设计决策:

  • 使用 Protocol 实现结构化子类型,无需继承
  • 使用 @runtime_checkable 支持 isinstance() 检查
  • 返回值语义统一:操作型返回 bool,获取型返回数据或默认值

2.2 TTL 返回值语义

python
def ttl(self, key: str) -> int:
    """返回值语义:
    - 负数: 特殊状态
      - -1: 永不过期 (没有设置 TTL)
      - -2: 键不存在
    - 正数: 剩余生存时间 (秒)
    """

设计理由: 与 Redis TTL 命令语义一致。


三、LocalCache 设计

3.1 存储结构选择

python
# 使用 OrderedDict 而非普通 dict
self._cache: OrderedDict = OrderedDict()
self._timestamps: Optional[OrderedDict] = OrderedDict() if ttl > 0 else None

OrderedDict vs dict:

特性OrderedDictdict
元素顺序✅ 保持插入顺序✅ Python 3.7+ 保持
LRU 操作move_to_end()❌ 需要手动维护
FIFO 操作popitem(last=False)❌ 需要额外逻辑
内存开销略高较低
操作性能略低较高

决策: 使用 OrderedDict 以支持高效的 LRU/FIFO 驱逐。

3.2 单例模式设计

python
class LocalCache:
    _instances: Dict[str, 'LocalCache'] = {}  # 类级别存储

    def __new__(cls, name: str = 'default', maxsize: int = 128,
                ttl: int = 0, eviction: str = 'lru', singleton: bool = True):
        if not singleton:
            return super().__new__(cls)

        # 唯一键: 相同配置返回同一实例
        key = f"{name}:{maxsize}:{ttl}:{eviction}"
        with cls._lock:
            if key not in cls._instances:
                # 超过最大实例数,驱逐 LRU 实例
                if len(cls._instances) >= cls._max_instances:
                    cls._evict_lru_instance()
                cls._instances[key] = super().__new__(cls)
        return cls._instances[key]

设计权衡:

方案优点缺点
单例 (当前)节省内存,缓存共享配置相同时共享状态
非单例完全隔离内存占用增加
支持两者灵活API 复杂度增加

决策: 默认单例,通过 singleton=False 支持创建独立实例。

3.3 LRU 驱逐实现

python
def get(self, key: str, default: Any = None) -> Any:
    with self._cache_lock:
        if key not in self._cache:
            self._misses += 1
            return default

        # TTL 过期检查
        if self._timestamps is not None and self._ttl > 0:
            if time.time() > self._timestamps[key]:
                del self._cache[key]
                del self._timestamps[key]
                self._misses += 1
                return default

        # LRU: 移动到末尾(最常用)
        if self._eviction == 'lru':
            self._cache.move_to_end(key)

        self._hits += 1
        return self._cache[key]

LRU 实现对比:

实现方式时间复杂度实现复杂度
move_to_end()O(1)简单
双向链表 + dictO(1)复杂
记录时间戳 + 排序O(n log n)中等

决策: 使用 OrderedDict.move_to_end() 实现 O(1) LRU。

3.4 TTL 惰性清理

python
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
    with self._cache_lock:
        # 惰性清理: 每次 set 时清理少量过期项
        if self._timestamps is not None and len(self._cache) > 0:
            self._lazy_cleanup_expired(max_items=10)
        # ... 执行 set 操作

def _lazy_cleanup_expired(self, max_items: int = 100) -> int:
    """惰性清理过期缓存项

    Args:
        max_items: 单次清理的最大数量,避免单次操作耗时过长
    """

清理策略对比:

策略优点缺点
惰性清理 (当前)性能影响小,实现简单过期数据暂存
后台线程清理及时清理需要线程管理
定时清理精确控制需要调度器
惰性 + 后台 (当前)综合优势复杂度增加

决策: 惰性清理为主,后台清理线程为辅。

3.5 后台清理线程

python
class LocalCache:
    _cleanup_thread: Optional[threading.Thread] = None
    _cleanup_event: threading.Event = None

    @classmethod
    def start_cleanup_thread(cls, interval: float = 300):
        """启动后台清理线程

        Args:
            interval: 清理间隔(秒)
        """
        if hasattr(cls, '_cleanup_thread') and cls._cleanup_thread is not None:
            return  # 避免重复启动

        cls._cleanup_event = threading.Event()

        def cleanup_worker():
            while not cls._cleanup_event.is_set():
                cls._cleanup_event.wait(interval)
                if cls._cleanup_event.is_set():
                    break
                try:
                    cls.cleanup_expired_instances()
                except Exception as e:
                    logger.warning(f"Error in cleanup worker: {e}")

        cls._cleanup_thread = threading.Thread(
            target=cleanup_worker,
            daemon=True,  # 守护线程,不阻止程序退出
            name="LocalCache-Cleanup"
        )
        cls._cleanup_thread.start()

# 注册退出时的清理
atexit.register(LocalCache.stop_cleanup_thread)
LocalCache.start_cleanup_thread(interval=300)

设计决策:

  • 使用守护线程 (daemon=True) 避免阻止程序退出
  • 通过 atexit 注册确保程序退出时停止线程
  • 单例模式:整个类只启动一个清理线程

四、RedisCacheAdapter 设计

4.1 客户端单例管理

python
class _RedisClientManager:
    """Redis 客户端管理器 - 单例模式"""
    _clients: Dict[str, Any] = {}  # 客户端缓存
    _lock = threading.Lock()

    @classmethod
    def get_client(cls, host: str, port: int, db: int, password: Optional[str]) -> Any:
        key = f"{host}:{port}:{db}:{password}"
        if key not in cls._clients:
            with cls._lock:
                if key not in cls._clients:  # 双重检查锁定
                    cls._clients[key] = redis.Redis(
                        host=host, port=port, db=db, password=password,
                        decode_responses=False,  # 保持 bytes 便于序列化
                        socket_connect_timeout=5,
                        socket_timeout=5,
                        retry_on_timeout=True,
                        health_check_interval=30,  # 自动健康检查
                    )
        return cls._clients[key]

设计决策:

  • 使用 (host, port, db, password) 作为唯一键
  • 相同连接参数复用客户端
  • decode_responses=False 保持 bytes 便于统一序列化处理

4.2 自动重连机制

python
def _ensure_connected(self) -> bool:
    """确保连接有效,如已断开则尝试重连"""
    if self._connected:
        try:
            self._client.ping()
            return True
        except (redis.ConnectionError, redis.TimeoutError, OSError):
            self._connected = False

    # 重连逻辑
    try:
        _RedisClientManager.remove_client(...)
        self._client = _RedisClientManager.get_client(...)
        self._client.ping()
        self._connected = True
        return True
    except Exception:
        self._connected = False
        return False

设计权衡:

方案优点缺点
每次操作前检查自动重连每次操作额外开销
静默失败性能好需要上层处理
两者结合 (当前)平衡实现复杂

4.3 Prefix 策略

python
def _make_key(self, key: str) -> str:
    return f"{self._prefix}{key}"

# 所有操作都通过 _make_key 添加前缀
def keys(self, pattern: str = "*", limit: int = 0) -> List[str]:
    search_pattern = f"{self._prefix}{pattern}"
    # ...

设计决策:

  • 前缀默认 fqcache:,支持命名空间隔离
  • keys() 返回时会去除前缀,提供干净的键名
  • 使用 SCAN 而非 KEYS,避免阻塞

4.4 SCAN 替代 KEYS

python
def keys(self, pattern: str = "*", limit: int = 0) -> List[str]:
    """获取所有匹配的键(使用 SCAN 替代 KEYS,避免阻塞)

    Args:
        pattern: 匹配模式
        limit: 返回数量限制,0 表示不限制
    """
    result = []
    cursor = 0
    search_pattern = f"{self._prefix}{pattern}"
    prefix_len = len(self._prefix)
    scan_count = min(1000, max(100, limit)) if limit > 0 else 1000

    while True:
        cursor, keys = self._client.scan(cursor, match=search_pattern, count=scan_count)
        for k in keys:
            key_str = k.decode() if isinstance(k, bytes) else k
            if key_str.startswith(self._prefix):
                key_str = key_str[prefix_len:]  # 去除前缀
            result.append(key_str)
            if limit > 0 and len(result) >= limit:
                return result
        if cursor == 0:
            break
    return result

KEYS vs SCAN:

特性KEYSSCAN
时间复杂度O(N)O(1) 每次迭代
阻塞
返回完整性一次返回游标迭代
大数据量不建议推荐

五、序列化设计

5.1 序列化策略

python
def serialize_value(value: Any) -> bytes:
    """序列化策略:
    1. numpy array → 特殊标记 + pickle
    2. pandas DataFrame/Series → 特殊标记 + pickle
    3. dict/list → json
    4. primitive → json
    """
    if isinstancex(value, np.ndarray):
        return b'\x01' + _serialize_np(value)
    elif isinstancex(value, (pd.DataFrame, pd.Series)):
        return b'\x02' + pickle.dumps(value)
    elif isinstance(value, (dict, list)):
        return b'\x03' + json.dumps(value).encode()
    else:
        return b'\x03' + json.dumps(value).encode()

标记字节:

标记类型
\x01numpy.ndarray
\x02pandas.DataFrame/Series
\x03json 序列化

5.2 反序列化策略

python
def deserialize_value(value: bytes, pickle_first: bool = False, safe_mode: bool = False) -> Any:
    if value is None:
        return None

    if not isinstance(value, bytes):
        return value

    prefix = value[:1]
    data = value[1:]

    if pickle_first:
        # 优先尝试 pickle
        try:
            return pickle.loads(data)
        except:
            pass
        # 回退到其他格式
        if prefix == b'\x01':
            return _deserialize_np(data)
        elif prefix == b'\x03':
            return json.loads(data.decode())

    # 非 pickle_first 模式
    if prefix == b'\x01':
        return _deserialize_np(data)
    elif prefix == b'\x02':
        return pickle.loads(data)
    elif prefix == b'\x03':
        return json.loads(data.decode())

    # 未知格式,尝试 pickle
    try:
        return pickle.loads(value)
    except:
        return value

设计权衡:

参数场景决策
pickle_first=False性能优先避免不必要的 pickle 开销
pickle_first=True兼容性优先确保 pickle 数据能正确读取
safe_mode=False向后兼容默认行为
safe_mode=True安全优先处理不可信数据

六、装饰器设计

6.1 缓存键生成策略

python
def _make_cache_key(func, *args, **kwargs) -> str:
    """生成缓存键

    策略:
    1. 函数全名 (module.name) 作为键的一部分
    2. args 和 kwargs 的 repr 作为键内容
    3. SHA256 哈希确保键长度可控
    """
    func_name = func.__module__ + "." + func.__name__
    key_parts = [func_name]

    sig = inspect.signature(func)
    param_names = list(sig.parameters.keys())

    for i, arg in enumerate(args):
        if i < len(param_names):
            key_parts.append(f"{param_names[i]}={repr(arg)}")
        else:
            key_parts.append(f"arg{i}={repr(arg)}")

    for k, v in kwargs.items():
        key_parts.append(f"{k}={repr(v)}")

    key_string = "|".join(key_parts)
    return hashlib.sha256(key_string.encode()).hexdigest()

设计权衡:

方案优点缺点
直接 repr简单键可能过长
SHA256 哈希键长度固定不可读
两者结合 (当前)平衡略微复杂

6.2 动态 TTL 支持

python
def local_cache(maxsize: int = 128, ttl: int = 0,
                key_ttl_func: Callable[..., Optional[int]] = None):

    def decorator(func):
        cache = LocalCache(name=func.__name__, maxsize=maxsize, ttl=ttl)

        @wraps(func)
        def wrapper(*args, **kwargs):
            # ... 缓存查找

            effective_ttl = ttl
            if key_ttl_func is not None:
                # 动态 TTL:根据参数决定过期时间
                effective_ttl = key_ttl_func(*args, **kwargs) or ttl

            cache.set(key, result, ttl=effective_ttl)
            return result

        return wrapper
    return decorator

应用场景:

  • 不同参数组合对应不同的缓存策略
  • 例如:热门数据短 TTL,冷门数据长 TTL

七、错误处理设计

7.1 静默失败策略

python
def get(self, key: str, default: Any = None) -> Any:
    if not self._connected:
        return default
    try:
        value = self._client.get(self._make_key(key))
        if value is None:
            return default
        return deserialize_value(value, ...)
    except Exception as e:
        logger.warning(f"RedisCacheAdapter.get failed: {e}")
        return default

设计理由:

  • 缓存是可选的性能优化层
  • 缓存故障不应导致业务失败
  • 日志记录便于问题排查

7.2 异常日志级别

操作日志级别理由
连接失败error需要关注,可能需要干预
操作失败warning暂时性问题,不紧急
键不存在不记录正常情况

八、配置设计

8.1 配置协议层次

CacheConfigProtocol (基础协议)

       ├── ttl_default: int


RedisConfigProtocol ──────────────► MongoConfigProtocol
       │                                │
       ├── redis_host                   ├── mongo_host
       ├── redis_port                   ├── mongo_port
       ├── redis_db                     ├── mongo_database
       ├── redis_password               ├── mongo_collection
       ├── redis_pickle_first           ├── mongo_username
       └── prefix                       ├── mongo_password
                                            └── prefix

8.2 工厂模式

python
def create_cache(config: CacheConfigProtocol = None) -> CacheInterface:
    """根据配置创建缓存适配器

    设计:
    1. 默认为 None,从环境变量加载配置
    2. 根据 config.cache_type 创建对应适配器
    3. LocalCache 作为最后的兜底方案
    """
    if config is None:
        config = get_cache_config()

    if config.cache_type == "redis":
        return RedisCacheAdapter(config)
    elif config.cache_type == "mongo":
        return MongoCacheAdapter(config)
    else:
        return LocalCache(name="default", ttl=config.ttl_default)

九、性能优化

9.1 批量操作优化

python
def set_many(self, mapping: Dict[str, Any], ttl: Optional[int] = None) -> bool:
    """批量设置缓存 - 使用 pipeline 减少网络往返"""
    if not self._connected:
        return False
    try:
        pipe = self._client.pipeline()  # 批量执行
        for key, value in mapping.items():
            serialized = serialize_value(value)
            if ttl:
                pipe.setex(self._make_key(key), ttl, serialized)
            else:
                pipe.set(self._make_key(key), serialized)
        pipe.execute()  # 一次网络往返
        return True
    except Exception as e:
        logger.warning(f"RedisCacheAdapter.set_many failed: {e}")
        return False

9.2 惰性清理优化

python
def set_many(self, mapping: Dict[str, Any], ttl: Optional[int] = None) -> bool:
    with self._cache_lock:
        # 根据批量大小动态调整清理数量
        if self._timestamps is not None and len(self._cache) > 0:
            cleanup_count = max(10, len(mapping) // 10)
            self._lazy_cleanup_expired(max_items=cleanup_count)

十、安全设计

10.1 safe_mode 参数

python
class RedisCacheAdapter:
    def __init__(self, ..., safe_mode: bool = False):
        self.safe_mode = safe_mode

    def get(self, key: str, default: Any = None) -> Any:
        # safe_mode=True 时可以添加额外安全检查
        value = deserialize_value(value, pickle_first=self.pickle_first,
                                   safe_mode=self.safe_mode)

10.2 pickle 安全

  • pickle 反序列化存在安全风险
  • 默认 safe_mode=False 保持向后兼容
  • 处理不可信数据时设置 safe_mode=True

十一、相关文档

文档说明
API 文档详细 API 参考
开发指南开发最佳实践
应用示例10 个应用场景
缓存适配器适配器详细说明
架构文档架构设计详解
框架文档模块框架概述
Cache Prefix 使用场景prefix 隔离策略