- 修正CAGR计算,去除NaN并检查起始值有效性以避免异常结果 - 优化混合数据源的数据对齐逻辑,使用配置结束日期与A股最新数据日期的较早者 - 计算因子时对齐A股交易日历,重新基于对齐价格计算日收益率,改进因子对齐准确度 - 轮动策略中跳过空信号,避免空信号影响持仓和调仓逻辑 - 调整信号处理,过滤空字符串和NaN,保证轮动信号数据有效性 - 多品种轮动持仓中加入空信号判断,避免无效信号导致错误 - 调整调仓明细和品种汇总保存逻辑,增加空文件创建以保证输出路径文件稳定生成 - 完善多处打印信息和注释,增强代码可读性与调试便利性
192 lines
6.1 KiB
Python
192 lines
6.1 KiB
Python
"""
|
||
动量因子计算模块
|
||
|
||
支持两种动量因子:
|
||
1. N日涨幅(简单动量)
|
||
2. 斜率×R²趋势得分(改进版)
|
||
"""
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
from sklearn.linear_model import LinearRegression
|
||
|
||
|
||
def calculate_momentum(price_series: pd.Series, n: int) -> pd.Series:
|
||
"""
|
||
计算 N 日涨幅(简单动量)
|
||
|
||
Args:
|
||
price_series: 价格序列
|
||
n: 动量窗口天数
|
||
|
||
Returns:
|
||
Series: N日涨幅
|
||
"""
|
||
return price_series / price_series.shift(n + 1) - 1.0
|
||
|
||
|
||
def _slope_r2_score(srs: pd.Series, n: int = 25) -> float:
|
||
"""
|
||
单次计算斜率×R²趋势得分
|
||
|
||
Args:
|
||
srs: 价格窗口序列(长度为 n)
|
||
n: 窗口长度
|
||
|
||
Returns:
|
||
float: 斜率 × R² × 10000
|
||
"""
|
||
if srs.shape[0] < n:
|
||
return np.nan
|
||
|
||
x = np.arange(1, n + 1).reshape(-1, 1)
|
||
y = srs.values / srs.values[0] # 归一化
|
||
|
||
lr = LinearRegression().fit(x, y)
|
||
slope = lr.coef_[0]
|
||
r_squared = lr.score(x, y)
|
||
score = 10000 * slope * r_squared
|
||
|
||
return score
|
||
|
||
|
||
def calculate_slope_r2(price_series: pd.Series, n: int = 25) -> pd.Series:
|
||
"""
|
||
计算斜率×R²趋势得分序列
|
||
|
||
Args:
|
||
price_series: 价格序列
|
||
n: 滚动窗口天数
|
||
|
||
Returns:
|
||
Series: 趋势得分序列
|
||
"""
|
||
return price_series.rolling(n).apply(
|
||
lambda x: _slope_r2_score(x, n), raw=False
|
||
)
|
||
|
||
|
||
def calculate_daily_return(price_series: pd.Series) -> pd.Series:
|
||
"""
|
||
计算日收益率
|
||
|
||
Args:
|
||
price_series: 价格序列
|
||
|
||
Returns:
|
||
Series: 日收益率
|
||
"""
|
||
return price_series / price_series.shift(1) - 1
|
||
|
||
|
||
def _is_china_index(code: str) -> bool:
|
||
"""判断是否为A股指数"""
|
||
return code.endswith('.SH') or code.endswith('.SZ') or code.endswith('.SS')
|
||
|
||
|
||
def compute_factors(
|
||
index_data: pd.DataFrame,
|
||
code_list: list,
|
||
n: int = 25,
|
||
factor_type: str = "slope_r2",
|
||
etf_data: pd.DataFrame = None,
|
||
code_config: dict = None,
|
||
) -> tuple[pd.DataFrame, list]:
|
||
"""
|
||
计算所有指数的因子和日收益率(横截面策略版本)
|
||
|
||
核心逻辑:
|
||
1. 每个标的按照自己的交易日历计算技术指标
|
||
2. 对齐到A股交易日历(取离A股交易日最近的有效数据,不使用未来数据)
|
||
3. 严格控制T+1规则:T日收盘计算信号,使用T日及之前的数据
|
||
|
||
Args:
|
||
index_data: 指数价格数据(宽格式,已对齐到A股交易日历,非A股可能有NaN)
|
||
code_list: 指数代码列表
|
||
n: 动量/趋势窗口
|
||
factor_type: 'momentum' 或 'slope_r2'
|
||
etf_data: ETF价格数据(宽格式,用于收益计算)
|
||
code_config: 代码配置字典 {code: {name, etf, market}}
|
||
|
||
Returns:
|
||
tuple: (result_df, valid_codes)
|
||
- result_df: 包含因子得分和日收益率的DataFrame(按A股交易日对齐)
|
||
- valid_codes: 有效代码列表
|
||
"""
|
||
code_config = code_config or {}
|
||
|
||
# 如果没有提供ETF数据,创建一个空的DataFrame
|
||
if etf_data is None:
|
||
etf_data = pd.DataFrame()
|
||
|
||
# 获取A股交易日历(index_data的索引)
|
||
a_share_dates = index_data.index
|
||
|
||
# 过滤有效代码
|
||
valid_codes = []
|
||
for code in code_list:
|
||
if code not in index_data.columns:
|
||
print(f" ⚠ 跳过 {code}: 不在数据中")
|
||
continue
|
||
valid_codes.append(code)
|
||
|
||
# 为每个标的单独计算指标,然后对齐到A股交易日历
|
||
result = pd.DataFrame(index=a_share_dates)
|
||
|
||
for code in valid_codes:
|
||
# 获取该标的的原始价格数据(去除NaN)
|
||
price_series = index_data[code].dropna()
|
||
|
||
if len(price_series) < n + 1:
|
||
print(f" ⚠ 剔除 {code}: 数据不足 ({len(price_series)} < {n+1})")
|
||
valid_codes.remove(code)
|
||
continue
|
||
|
||
# 按照该标的自己的交易日历计算指标(使用指数数据)
|
||
if factor_type == "momentum":
|
||
factor_series = calculate_momentum(price_series, n)
|
||
elif factor_type == "slope_r2":
|
||
factor_series = calculate_slope_r2(price_series, n)
|
||
else:
|
||
raise ValueError(f"不支持的因子类型: {factor_type}")
|
||
|
||
# 对齐到A股交易日历:价格使用ffill,指标使用ffill
|
||
# 但日收益率需要基于对齐后的价格重新计算,而不是直接ffill
|
||
price_aligned = price_series.reindex(a_share_dates, method='ffill')
|
||
factor_aligned = factor_series.reindex(a_share_dates, method='ffill')
|
||
|
||
# 基于对齐后的价格重新计算日收益率
|
||
# 这样如果T日没有交易(价格被ffill),日收益率为0
|
||
return_aligned = calculate_daily_return(price_aligned)
|
||
|
||
result[code] = price_aligned
|
||
result[f"得分_{code}"] = factor_aligned
|
||
result[f"日收益率_{code}"] = return_aligned
|
||
|
||
# 过滤掉缺失值过多的指数(基于A股交易日历)
|
||
total_rows = len(result)
|
||
final_valid_codes = []
|
||
for code in valid_codes:
|
||
null_pct = result[code].isnull().sum() / total_rows
|
||
if null_pct > 0.2:
|
||
print(f" ⚠ 剔除 {code}: 对齐后缺失率 {null_pct:.1%} 过高")
|
||
result = result.drop(columns=[code, f"得分_{code}", f"日收益率_{code}"], errors='ignore')
|
||
else:
|
||
final_valid_codes.append(code)
|
||
|
||
# 注意:不做dropna,保留所有A股交易日
|
||
# 非A股标的在没有数据的日子,得分和日收益率会保持NaN或前向填充值
|
||
# 这是正常的横截面策略行为:T日只交易有数据的标的
|
||
score_cols = [f"得分_{code}" for code in final_valid_codes]
|
||
|
||
print("\n因子计算完成:")
|
||
print(f" 因子类型: {factor_type}")
|
||
print(f" 窗口天数: {n}")
|
||
print(f" 有效指数: {len(final_valid_codes)}/{len(code_list)}")
|
||
print(f" 有效数据: {len(result)} 行")
|
||
print(f" 时间范围: {result.index[0].date()} ~ {result.index[-1].date()}")
|
||
if etf_data is not index_data and not etf_data.empty:
|
||
print(f" 使用ETF数据计算收益: ✓")
|
||
|
||
return result, final_valid_codes
|