Skip to content

使用指南

基础用法

简单重试

python
from FQBase.Foundation.retry import retry

@retry(stop_max_attempt_number=3)
def fetch_data():
    response = http_client.get("https://api.example.com/data")
    return response.json()

随机延迟重试

python
@retry(stop_max_attempt_number=5, wait_random_min=100, wait_random_max=500)
def call_api():
    return api.request()

指数退避重试

python
from FQBase.Foundation.retry import retry_with_exponential_backoff

@retry_with_exponential_backoff(max_attempts=4, base_wait=200, max_wait=5000)
def get_stock_price():
    return stock_api.get_price("AAPL")

异常过滤

只重试特定异常

python
@retry(stop_max_attempt_number=3, retry_on_exception=(ConnectionError, TimeoutError))
def network_call():
    return http_client.get(url)

多异常类型过滤

python
from requests.exceptions import ConnectionError, Timeout
from FQBase.Foundation.retry import retry

@retry(stop_max_attempt_number=5, retry_on_exception=(ConnectionError, Timeout))
def fetch_with_retry():
    return requests.get("https://api.example.com/data")

区分可恢复和不可恢复异常

python
@retry(stop_max_attempt_number=3, retry_on_exception=(TemporaryError, RateLimitError))
def fragile_operation():
    if resource_not_found():
        raise PermanentError("Cannot recover")  # 不会重试,立即抛出
    return operation()

回调函数

日志记录

python
import logging

logger = logging.getLogger(__name__)

def log_retry(attempt: int, exception: Exception):
    logger.warning(f"Attempt {attempt} failed: {type(exception).__name__}: {exception}")

@retry(stop_max_attempt_number=3, on_retry=log_retry)
def unreliable_service():
    return service.call()

指标收集

python
from prometheus_client import Counter

retry_counter = Counter('retry_attempts_total', 'Total retry attempts', ['function'])

def count_retry(attempt: int, exception: Exception):
    retry_counter.labels(function='unreliable_service').inc()

@retry(stop_max_attempt_number=5, on_retry=count_retry)
def unreliable_service():
    return service.call()

告警通知

python
import alerts

def notify_retry(attempt: int, exception: Exception):
    if attempt >= 3:
        alerts.send(f"Persistent failure after {attempt} attempts: {exception}")

@retry(stop_max_attempt_number=5, on_retry=notify_retry)
def critical_operation():
    return trading_engine.execute(order)

异步重试

异步指数退避

python
import asyncio
from FQBase.Foundation.retry import async_retry_with_exponential_backoff

@async_retry_with_exponential_backoff(max_attempts=3, base_wait=100, max_wait=2000)
async def fetch_data_async():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as resp:
            return await resp.json()

异步上下文管理器

python
@retry_with_exponential_backoff(max_attempts=3)
async def process_queue():
    async with RedisClient() as client:
        task = await client.dequeue()
        return await process(task)

RetryContext 用法

手动控制重试

python
from FQBase.Foundation.retry import create_retry_context

ctx = create_retry_context(
    func=fragile_operation,
    max_attempts=5,
    wait_min=100,
    wait_max=1000
)

# 检查执行前状态
print(f"Will attempt up to {ctx.max_attempts} times")

# 执行
result = ctx.execute()

# 检查执行后状态
print(f"Completed after {ctx.attempt_count} attempts")

多次执行同一上下文

python
ctx = create_retry_context(func=batch_job, max_attempts=3)

for item in items:
    ctx.execute(item)  # 每次执行重置 attempt_count
    print(f"Processed item after {ctx.attempt_count} attempts")

带回调的上下文

python
from FQBase.Foundation.retry import create_retry_context

def on_retry(attempt: int, exc: Exception):
    logger.info(f"Retry {attempt}: {exc}")

ctx = create_retry_context(
    func=api_call,
    max_attempts=5,
    wait_min=200,
    wait_max=2000,
    on_retry=on_retry
)

超时控制

最大总时间限制

python
@retry_with_exponential_backoff(max_attempts=10, max_total_time=30.0)
def long_running_operation():
    return service.call()

结合其他超时机制

python
import signal
from functools import wraps

class TimeoutError(Exception):
    pass

def timeout(seconds):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            def handler(signum, frame):
                raise TimeoutError(f"{func.__name__} timed out after {seconds}s")
            signal.signal(signal.SIGALRM, handler)
            signal.alarm(seconds)
            try:
                return func(*args, **kwargs)
            finally:
                signal.alarm(0)
        return wrapper
    return decorator

@timeout(5)
@retry_with_exponential_backoff(max_attempts=3, max_total_time=3.0)
def timed_retry_operation():
    return service.call()

装饰器组合

重试 + 日志

python
def retry_with_logging(func):
    logger = logging.getLogger(func.__module__)

    def log_retry(attempt, exc):
        logger.debug(f"{func.__name__} retry {attempt}: {exc}")

    return retry(
        stop_max_attempt_number=3,
        wait_random_min=50,
        wait_random_max=200,
        on_retry=log_retry
    )(func)

@retry_with_logging
def unreliable_operation():
    return service.call()

自定义重试装饰器工厂

python
def create_retry_decorator(max_attempts=3, base_wait=100, exceptions=(Exception,)):
    def decorator(func):
        return retry_with_exponential_backoff(
            max_attempts=max_attempts,
            base_wait=base_wait,
            retry_on_exception=exceptions
        )(func)
    return decorator

# 使用
@create_retry_decorator(max_attempts=5, base_wait=200, exceptions=(ConnectionError,))
def network_call():
    return http.get(url)

实际应用场景

数据库连接重试

python
from FQBase.Foundation.retry import retry_with_exponential_backoff

@retry_with_exponential_backoff(max_attempts=5, base_wait=100, max_wait=3000,
                                 retry_on_exception=(ConnectionError, OperationalError))
def get_database_connection():
    return engine.connect()

第三方 API 调用

python
@retry_with_exponential_backoff(max_attempts=3, base_wait=500, max_wait=5000,
                                 retry_on_exception=(RateLimitError, ServiceUnavailable))
def call_market_data_api():
    return market_data.get_quotes(symbols)

分布式锁获取

python
@retry(stop_max_attempt_number=10, wait_random_min=50, wait_random_max=200,
       retry_on_exception=(LockNotAvailableError,))
def acquire_lock_with_retry(lock_key: str):
    return distributed_lock.acquire(lock_key, timeout=30)

消息队列消费

python
@retry_with_exponential_backoff(max_attempts=3, base_wait=1000, max_wait=10000)
def process_message(msg):
    try:
        return handler.handle(msg)
    except ValidationError:
        msg.nack()  # 不重试
        raise
    except ProcessingError:
        msg.nack(requeue=True)  # 重试
        raise