Files
etf/docs/universal_fetcher_ARCHITECTURE.md
aszerW 0e531a1876 docs: 添加完整项目文档
- universal_fetcher_README.md:统一数据获取接口完整文档
- universal_fetcher_QUICKSTART.md:5分钟快速上手指南
- universal_fetcher_ARCHITECTURE.md:架构设计说明
- universal_fetcher_TEST_REPORT.md:测试报告与修复记录
- flask_api_README.md:Flask API 完整文档
- FLASK_SERVICE_SUMMARY.md:项目实现总结

总计 2000+ 行文档,涵盖 API 说明、使用示例、架构设计
2026-05-07 21:20:03 +08:00

12 KiB
Raw Permalink Blame History

统一数据获取接口 - 架构设计

设计目标

创建一个简单易用的数据获取接口,能够:

  1. 自动识别资产类型A股、美股、港股、期货、加密货币等
  2. 自动路由到正确的数据源Tushare、YFinance、CCXT
  3. 统一返回格式化的 DataFrame
  4. 容错处理(重试机制、错误处理)

整体架构

┌─────────────────────────────────────────────────────────┐
│              UniversalDataFetcher                        │
│                                                         │
│  用户调用: fetch(code, start, end)                       │
└───────────────────┬─────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────┐
│           AssetTypeDetector.detect(code)                 │
│                                                         │
│  输入: "000300.SH"                                      │
│  输出: "china_index"                                    │
└───────────────────┬─────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────┐
│              数据源路由器                                 │
│                                                         │
│  china_index  → _fetch_china()   → Tushare              │
│  china_etf    → _fetch_china()   → Tushare              │
│  china_stock  → _fetch_china()   → Tushare              │
│  hk_*         → _fetch_yfinance() → YFinance            │
│  us_*         → _fetch_yfinance() → YFinance            │
│  futures      → _fetch_futures()  → Tushare             │
│  crypto       → _fetch_crypto()   → CCXT/OKX            │
└───────────────────┬─────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────┐
│           标准化 DataFrame 返回                           │
│                                                         │
│  Index: DatetimeIndex                                   │
│  Columns: [open, high, low, close, volume, code]        │
└─────────────────────────────────────────────────────────┘

核心组件

1. AssetTypeDetector资产类型检测器

职责: 根据代码格式自动识别资产类型

检测规则(按优先级):

1. 加密货币代码集合匹配 (BTC, ETH, ...)
   └─ → crypto

2. 期货后缀匹配 (.SHF, .DCE, .CZC, ...)
   └─ → futures

3. 港股后缀匹配 (.HK)
   ├─ 在 YF_CODE_MAP 中 → hk_index
   └─ 否则             → hk_stock

4. A股后缀匹配 (.SH, .SZ, .SS, .CSI)
   ├─ 6位数字代码:
   │   ├─ 前缀 000,001,002,399,930,931,932 → china_index
   │   ├─ 前缀 51,52,56,58,15,16          → china_etf
   │   └─ 其他                            → china_stock
   └─ 非6位数字 → china_stock

5. 美股指数映射表匹配 (NDX, SPX, ...)
   └─ → us_index

6. 默认
   └─ → us_stock

代码示例:

class AssetTypeDetector:
    @classmethod
    def detect(cls, code: str) -> str:
        # 加密货币优先
        if code.upper() in cls.CRYPTO_CODES:
            return 'crypto'
        
        # 期货
        if any(code.endswith(suffix) for suffix in cls.FUTURES_SUFFIXES):
            return 'futures'
        
        # ... 其他规则

2. UniversalDataFetcher统一数据获取器

职责: 封装所有数据源,提供统一接口

主要方法:

方法 功能 返回值
fetch() 获取单只标的 DataFrameNone
fetch_multiple() 批量获取 Dict[str, DataFrame]

数据获取流程:

def fetch(self, code, start_date, end_date, retry=3):
    for attempt in range(retry):
        try:
            # 1. 检测资产类型
            asset_type = AssetTypeDetector.detect(code)
            
            # 2. 路由到对应的获取方法
            if asset_type in ('china_index', 'china_etf', 'china_stock'):
                return self._fetch_china(code, start_date, end_date, asset_type)
            elif asset_type == 'futures':
                return self._fetch_futures(code, start_date, end_date)
            elif asset_type in ('hk_index', 'hk_stock', 'us_index', 'us_stock'):
                return self._fetch_yfinance(code, start_date, end_date, asset_type)
            elif asset_type == 'crypto':
                return self._fetch_crypto(code, start_date, end_date)
        except Exception as e:
            # 3. 重试机制
            if attempt < retry - 1:
                time.sleep(2)
            else:
                return None

3. 数据源适配器

3.1 Tushare 适配器 (_fetch_china)

功能: 获取A股数据指数、ETF、股票、期货

接口选择:

if asset_type == 'china_index':
    df = pro.index_daily(ts_code, ...)  # 指数日线
elif asset_type == 'china_etf':
    df = pro.fund_daily(ts_code, ...)   # 基金日线
    if 无数据:
        df = pro.daily(ts_code, ...)    # 股票日线
elif asset_type == 'futures':
    df = pro.fut_daily(ts_code, ...)    # 期货日线

代理处理:

# Tushare 是国内服务,需要临时清除代理
original_proxy = {}
for key in ["HTTP_PROXY", "HTTPS_PROXY", ...]:
    original_proxy[key] = os.environ.pop(key, None)

try:
    # 调用 Tushare API
    df = pro.index_daily(...)
finally:
    # 恢复代理设置
    for key, value in original_proxy.items():
        if value is not None:
            os.environ[key] = value

3.2 YFinance 适配器 (_fetch_yfinance)

功能: 获取港股、美股数据

代码转换:

# 使用映射表转换代码
yf_code = AssetTypeDetector.YF_CODE_MAP.get(code, code)

# 美股指数需要加 ^ 前缀
if asset_type == 'us_index' and not yf_code.startswith('^'):
    yf_code = f'^{yf_code}'

日期处理:

# yfinance 的 end 参数是排他的,需要加一天
end_date_obj = pd.Timestamp(end_date) + timedelta(days=1)
data = ticker.history(start=start_date, end=end_date_obj)

3.3 CCXT 适配器 (_fetch_crypto)

功能: 获取加密货币数据(通过 OKX 交易所)

实现: 直接复用 HybridDataSource._fetch_ccxt()

4. 标准化输出

所有数据源返回统一格式:

# DataFrame 结构
Index: DatetimeIndex (日期)
Columns:
  - open:   开盘价 (float)
  - high:   最高价 (float)
  - low:    最低价 (float)
  - close:  收盘价 (float)
  - volume: 成交量 (float)
  - code:   标的代码 (str)

标准化代码:

# 统一列名
df = df.rename(columns={
    "trade_date": "date",
    "vol": "volume",
    "Open": "open",
    "High": "high",
    "Low": "low",
    "Close": "close",
})

# 设置日期索引
df["date"] = pd.to_datetime(df["date"])
df = df.set_index("date")
df = df.sort_index()

# 选择需要的列
cols = ['open', 'high', 'low', 'close', 'volume']
available = [c for c in cols if c in df.columns]
df = df[available]
df['code'] = code

设计模式

1. 策略模式 (Strategy Pattern)

不同的数据源实现了相同的接口(获取 OHLCV 数据),根据资产类型动态选择策略。

数据源策略:
  ├─ TushareStrategy    (中国A股)
  ├─ YFinanceStrategy   (港美股)
  └─ CCXTStrategy       (加密货币)

2. 外观模式 (Facade Pattern)

UniversalDataFetcher 作为外观,隐藏了底层多个数据源的复杂性,提供简化的接口。

用户 → UniversalDataFetcher → [Tushare, YFinance, CCXT]

3. 上下文管理器 (Context Manager)

支持 with 语句,自动管理 SSH 隧道的生命周期。

with UniversalDataFetcher(ssh_config=ssh_config) as fetcher:
    df = fetcher.fetch(...)
# SSH 隧道自动关闭

错误处理策略

1. 重试机制

for attempt in range(retry):
    try:
        return self._fetch_xxx(...)
    except Exception as e:
        if attempt < retry - 1:
            time.sleep(2)  # 等待后重试
        else:
            return None    # 重试失败

2. 优雅降级

  • 数据获取失败时返回 None,不抛出异常
  • 批量获取时单个失败不影响其他标的

3. 详细日志

print(f"✓ {code}: {len(df)} 条")  # 成功
print(f"✗ {code}: 无数据")        # 失败

性能优化

1. 批量分组

# 按资产类型分组,减少重复检测
grouped = {}
for code in codes:
    asset_type = AssetTypeDetector.detect(code)
    if asset_type not in grouped:
        grouped[asset_type] = []
    grouped[asset_type].append(code)

2. 延迟加载

# Tushare token 延迟加载
def _get_tushare_token(self):
    if self._tushare_token is None:
        self._tushare_token = os.getenv("TUSHARE_TOKEN")
    return self._tushare_token

3. 限流处理

# YFinance 添加延迟避免限流
time.sleep(0.5)
data = ticker.history(...)

扩展性设计

添加新的资产类型

步骤1: 在 AssetTypeDetector 中添加检测规则

class AssetTypeDetector:
    NEW_ASSET_CODES = {'CODE1', 'CODE2'}
    
    @classmethod
    def detect(cls, code: str) -> str:
        if code in cls.NEW_ASSET_CODES:
            return 'new_asset'
        # ... 其他规则

步骤2: 在 UniversalDataFetcher 中添加获取方法

class UniversalDataFetcher:
    def _fetch_new_asset(self, code, start_date, end_date):
        # 实现数据获取逻辑
        return df

步骤3: 在路由中添加分支

def fetch(self, code, ...):
    asset_type = AssetTypeDetector.detect(code)
    
    if asset_type == 'new_asset':
        return self._fetch_new_asset(code, start_date, end_date)
    # ... 其他分支

测试策略

1. 单元测试

  • 测试资产类型检测的准确性
  • 测试各种代码格式的识别

2. 集成测试

  • 测试真实数据获取(需要网络)
  • 测试不同资产类型的数据获取

3. 边界测试

  • 无效代码处理
  • 空数据范围
  • 网络异常处理

与现有代码的关系

现有代码:
  HybridDataSource (轮动策略使用)
    ├─ _fetch_tushare()
    ├─ _fetch_yfinance()
    ├─ _fetch_ccxt()
    └─ fetch_all()

新增代码:
  UniversalDataFetcher (通用接口)
    ├─ 复用 HybridDataSource 的底层方法
    ├─ 增加资产类型检测
    ├─ 增加A股股票支持
    ├─ 简化API
    └─ 提供便捷函数

兼容性:

  • 不修改现有 HybridDataSource
  • 新接口是对现有代码的封装和扩展
  • 现有轮动策略代码无需修改

未来改进方向

  1. 缓存机制: 添加本地缓存,减少重复请求
  2. 异步支持: 使用 asyncio 提高批量获取性能
  3. 分钟级数据: 扩展支持分钟级K线
  4. 实时数据: 接入实时行情接口
  5. 数据质量检查: 自动检测异常数据(停牌、涨跌停等)

总结

UniversalDataFetcher 通过以下设计实现了"一个接口,所有资产"的目标:

自动识别: 智能检测资产类型
自动路由: 选择最优数据源
统一格式: 返回标准化 DataFrame
容错处理: 重试、降级、日志
易于扩展: 支持新资产类型
向后兼容: 不破坏现有代码