feat(framework_v2): 实现通用配置系统,支持扁平化资产池和策略分组

- 使用 Pydantic Schema 验证配置类型安全
- 实现扁平化 AssetPool,移除预设分类(equity/commodity/fixed_income)
- 移除 MarketType 枚举,改用 group 字符串字段实现策略分组
- AssetConfig 引入 signal_source/trade_source 分离,支持跨市场场景
- ConfigLoader 支持通用 StrategyConfig,向后兼容 RotationStrategyConfig
- 新增 GroupConfig 替代 MarketGroupConfig,支持分散化选股

重构核心:
- market → group(策略分组语义,组内竞争强制分散)
- by_market → by_group
- MarketGroupConfig → GroupConfig
This commit is contained in:
2026-05-24 14:25:25 +08:00
parent 99d3584d05
commit 341611c32b
3 changed files with 763 additions and 0 deletions

View File

@@ -0,0 +1,444 @@
"""
配置 Schema 定义
使用 Pydantic 验证配置文件的类型安全
"""
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Optional, Dict, List, Literal
from enum import Enum
from datetime import date
# ============================================================
# 枚举类型
# ============================================================
# 注意:不再使用 MarketType 枚举,改用字符串类型的 group 字段
# group 字段用于策略分组(组内竞争,强制分散)
class FactorType(str, Enum):
"""因子类型枚举"""
MOMENTUM = "momentum"
SLOPE_R2 = "slope_r2"
WEIGHTED_MOMENTUM = "weighted_momentum"
class PremiumMode(str, Enum):
"""溢价控制模式"""
FILTER = "filter" # 完全排除
PENALIZE = "penalize" # 降权
class ThresholdMode(str, Enum):
"""阈值模式"""
FIXED = "fixed" # 固定阈值
DYNAMIC = "dynamic" # 动态阈值
class DataSourceType(str, Enum):
"""数据源类型"""
FLASK_API = "flask_api"
TUSHARE = "tushare"
YFINANCE = "yfinance"
# ============================================================
# 资产池 Schema扁平化设计
# ============================================================
class PremiumConfig(BaseModel):
"""溢价控制配置"""
enabled: bool = Field(default=True, description="是否启用")
threshold: float = Field(default=0.10, ge=0, le=1.0, description="溢价阈值")
class AssetConfig(BaseModel):
"""
标的配置(通用,支持所有场景)
场景 1指数信号 → 指数收益
signal_source: "NDX"
trade_source: "NDX"
场景 2指数信号 → ETF收益
signal_source: "NDX"
trade_source: "513100.SH"
场景 3ETF信号 → ETF收益
signal_source: "518880.SH"
trade_source: "518880.SH"
场景 4个股信号 → 个股收益
signal_source: "AAPL"
trade_source: "AAPL"
"""
name: str = Field(..., description="标的名称")
group: str = Field(..., description="策略分组(组内竞争,强制分散)")
# 核心字段:信号来源和交易来源
signal_source: str = Field(..., description="信号来源代码(计算因子用)")
trade_source: str = Field(..., description="交易来源代码(计算收益用)")
# 可选字段
description: Optional[str] = Field(None, description="标的描述")
etf: Optional[str] = Field(None, description="ETF代码兼容旧配置")
premium_control: Optional["PremiumConfig"] = Field(None, description="溢价控制配置")
@property
def is_cross_market(self) -> bool:
"""是否跨市场(信号和交易不同)"""
return self.signal_source != self.trade_source
class AssetPool(BaseModel):
"""
资产池(扁平化设计)
优势:
1. 不预设分类,支持任意策略分组
2. 通过 group 字段自动分组
3. 配置简单直观
4. 易于扩展新分组
示例:
asset_pool:
assets:
"NDX":
name: "纳指100"
group: "US_TECH"
signal_source: "NDX"
trade_source: "513100.SH"
"BTC":
name: "比特币"
group: "CRYPTO"
signal_source: "BTC"
trade_source: "BTC"
"""
assets: Dict[str, AssetConfig] = Field(
default_factory=dict,
description="所有标的flat结构"
)
@property
def by_group(self) -> Dict[str, Dict[str, AssetConfig]]:
"""按策略分组"""
groups = {}
for code, asset in self.assets.items():
group = asset.group
if group not in groups:
groups[group] = {}
groups[group][code] = asset
return groups
@property
def groups(self) -> list:
"""获取所有策略分组"""
return list(self.by_group.keys())
def get_signal_codes(self, group: str = None) -> list:
"""
获取信号标的
Args:
group: 可选,过滤特定分组(如 'US_TECH'
"""
if group:
return [
asset.signal_source
for asset in self.assets.values()
if asset.group == group
]
return [asset.signal_source for asset in self.assets.values()]
def get_trade_codes(self, group: str = None) -> list:
"""获取交易标的"""
if group:
return [
asset.trade_source
for asset in self.assets.values()
if asset.group == group
]
return [asset.trade_source for asset in self.assets.values()]
def get_signal_to_trade_mapping(self, group: str = None) -> Dict[str, str]:
"""获取信号→交易映射"""
mapping = {}
for asset in self.assets.values():
if group and asset.group != group:
continue
mapping[asset.signal_source] = asset.trade_source
return mapping
def count(self, group: str = None) -> int:
"""获取标的数量"""
if group:
return len([a for a in self.assets.values() if a.group == group])
return len(self.assets)
@field_validator('assets')
@classmethod
def check_non_empty(cls, v):
"""至少配置一个标的"""
if not v:
import warnings
warnings.warn("资产池为空")
return v
# ============================================================
# 因子配置 Schema
# ============================================================
class FactorConfig(BaseModel):
"""因子配置"""
type: FactorType = Field(default=FactorType.WEIGHTED_MOMENTUM, description="因子类型")
n_days: int = Field(default=25, ge=5, le=250, description="动量窗口期(天数)")
# 动态周期参数
auto_day: bool = Field(default=False, description="是否启用动态周期")
min_days: int = Field(default=20, ge=5, le=100, description="最小周期")
max_days: int = Field(default=60, ge=20, le=250, description="最大周期")
# ============================================================
# 阈值配置 Schema
# ============================================================
class DynamicThresholdConfig(BaseModel):
"""动态阈值配置"""
reference: str = Field(..., description="参考标的代码")
ratio: float = Field(default=1.0, gt=0, description="倍数")
fallback_enabled: bool = Field(default=True, description="参考不可用时是否回退")
fallback_value: float = Field(default=0.0, description="回退值")
class ThresholdConfig(BaseModel):
"""阈值配置(统一 V2/V3"""
mode: ThresholdMode = Field(default=ThresholdMode.DYNAMIC, description="阈值模式")
fixed_value: float = Field(default=0.0, description="固定阈值mode=fixed时使用")
dynamic: Optional[DynamicThresholdConfig] = Field(None, description="动态阈值mode=dynamic时使用")
@model_validator(mode='after')
def validate_dynamic_config(self):
"""验证动态阈值配置"""
if self.mode == ThresholdMode.DYNAMIC and self.dynamic is None:
raise ValueError("dynamic 模式必须配置 dynamic 字段")
return self
# ============================================================
# 轮动配置 Schema
# ============================================================
class GroupConfig(BaseModel):
"""策略分组配置(用于分散化选股)"""
group: str = Field(..., description="策略分组名称")
select_num: int = Field(default=1, ge=1, le=10, description="该分组选股数量")
class RotationConfig(BaseModel):
"""
轮动配置
支持两种选股模式:
1. 全局选股diversified=false直接选 Top-N
2. 分散化选股diversified=true按策略分组选股
"""
select_num: int = Field(default=3, ge=1, le=20, description="选股数量(全局模式)")
diversified: bool = Field(default=False, description="是否分散化选股")
# 分散化配置(可选)
diversification_groups: Optional[List[GroupConfig]] = Field(
None,
description="分散化分组配置diversified=true时使用"
)
threshold: ThresholdConfig = Field(
default_factory=ThresholdConfig,
description="动量阈值"
)
# ============================================================
# 调仓配置 Schema
# ============================================================
class RebalanceConfig(BaseModel):
"""调仓配置"""
min_hold_days: int = Field(default=1, ge=1, le=30, description="最低持有天数")
score_threshold: float = Field(default=0.0, ge=0, le=0.5, description="调仓得分阈值(%")
trade_cost: float = Field(default=0.001, ge=0, le=0.01, description="单次换仓成本")
# ============================================================
# 溢价控制 Schema
# ============================================================
class MarketPremiumOverride(BaseModel):
"""市场溢价覆盖配置"""
enabled: bool = Field(default=True, description="是否启用")
threshold: float = Field(default=0.10, ge=0, le=1.0, description="溢价阈值")
class PremiumControlConfig(BaseModel):
"""溢价控制配置"""
enabled: bool = Field(default=True, description="是否启用溢价控制")
default_threshold: float = Field(default=0.10, ge=0, le=1.0, description="默认溢价阈值")
mode: PremiumMode = Field(default=PremiumMode.FILTER, description="控制模式")
penalty_factor: float = Field(default=0.5, ge=0, le=1.0, description="降权惩罚系数")
# 按市场覆盖
market_overrides: Dict[str, MarketPremiumOverride] = Field(
default_factory=dict,
description="按市场覆盖配置"
)
# ============================================================
# 数据源配置 Schema
# ============================================================
class DataSourceConfig(BaseModel):
"""单个数据源配置"""
type: DataSourceType = Field(..., description="数据源类型")
enabled: bool = Field(default=True, description="是否启用")
timeout: int = Field(default=120, ge=10, le=600, description="超时时间(秒)")
# Flask API 特定配置
url: Optional[str] = Field(None, description="API地址flask_api使用")
# Tushare 特定配置
token: Optional[str] = Field(None, description="Tushare Tokentushare使用")
class DataConfig(BaseModel):
"""数据配置"""
sources: List[DataSourceConfig] = Field(..., description="数据源列表(按优先级排序)")
use_cache: bool = Field(default=True, description="是否使用本地缓存")
cache_dir: str = Field(default="data_cache", description="缓存目录")
@field_validator('sources')
@classmethod
def check_at_least_one(cls, v):
"""至少配置一个数据源"""
if not v:
raise ValueError("必须配置至少一个数据源")
return v
# ============================================================
# 基准配置 Schema
# ============================================================
class BenchmarkConfig(BaseModel):
"""基准配置"""
code: str = Field(..., description="基准代码")
name: str = Field(..., description="基准名称")
# ============================================================
# 回测配置 Schema
# ============================================================
class BacktestConfig(BaseModel):
"""回测配置"""
start_date: str = Field(..., description="回测起始日期YYYY-MM-DD")
end_date: Optional[str] = Field(None, description="回测结束日期None表示至今")
# ============================================================
# 元数据 Schema
# ============================================================
class MetadataConfig(BaseModel):
"""配置元数据"""
version: str = Field(default="1.0.0", description="配置版本")
strategy: str = Field(default="rotation", description="策略名称")
description: str = Field(default="", description="配置描述")
last_updated: Optional[str] = Field(None, description="最后更新日期")
# ============================================================
# 完整策略配置 Schema
# ============================================================
class RotationStrategyConfig(BaseModel):
"""
ETF轮动策略完整配置
使用示例:
from framework_v2.config.schemas import RotationStrategyConfig
import yaml
with open('config/rotation.yaml') as f:
config_dict = yaml.safe_load(f)
config = RotationStrategyConfig(**config_dict)
"""
metadata: MetadataConfig = Field(default_factory=MetadataConfig, description="元数据")
# 资产池
asset_pools: AssetPool = Field(..., description="资产池配置")
# 基准
benchmark: BenchmarkConfig = Field(..., description="基准配置")
# 回测
backtest: BacktestConfig = Field(..., description="回测配置")
# 因子
factor: FactorConfig = Field(default_factory=FactorConfig, description="因子配置")
# 轮动
rotation: RotationConfig = Field(default_factory=RotationConfig, description="轮动配置")
# 调仓
rebalance: RebalanceConfig = Field(default_factory=RebalanceConfig, description="调仓配置")
# 溢价控制
premium_control: PremiumControlConfig = Field(default_factory=PremiumControlConfig, description="溢价控制")
# 数据
data: DataConfig = Field(..., description="数据配置")
# ============================================================
# 通用策略配置 SchemaV2 架构)
# ============================================================
class StrategyConfig(BaseModel):
"""
通用策略配置(支持所有策略类型)
使用示例:
from framework_v2.config import load_config
config = load_config('rotation.yaml')
"""
metadata: MetadataConfig = Field(default_factory=MetadataConfig, description="元数据")
# 资产池
asset_pools: AssetPool = Field(..., description="资产池配置")
# 基准
benchmark: BenchmarkConfig = Field(..., description="基准配置")
# 回测
backtest: BacktestConfig = Field(..., description="回测配置")
# 因子
factor: FactorConfig = Field(default_factory=FactorConfig, description="因子配置")
# 轮动(可选)
rotation: Optional[RotationConfig] = Field(None, description="轮动配置")
# 调仓
rebalance: RebalanceConfig = Field(default_factory=RebalanceConfig, description="调仓配置")
# 溢价控制
premium_control: PremiumControlConfig = Field(default_factory=PremiumControlConfig, description="溢价控制")
# 数据
data: DataConfig = Field(..., description="数据配置")