- 修正CAGR计算,去除NaN并检查起始值有效性以避免异常结果 - 优化混合数据源的数据对齐逻辑,使用配置结束日期与A股最新数据日期的较早者 - 计算因子时对齐A股交易日历,重新基于对齐价格计算日收益率,改进因子对齐准确度 - 轮动策略中跳过空信号,避免空信号影响持仓和调仓逻辑 - 调整信号处理,过滤空字符串和NaN,保证轮动信号数据有效性 - 多品种轮动持仓中加入空信号判断,避免无效信号导致错误 - 调整调仓明细和品种汇总保存逻辑,增加空文件创建以保证输出路径文件稳定生成 - 完善多处打印信息和注释,增强代码可读性与调试便利性
204 lines
4.7 KiB
Python
204 lines
4.7 KiB
Python
"""
|
||
通用工具函数
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional
|
||
|
||
|
||
def format_date(date_str: str, output_format: str = "%Y-%m-%d") -> str:
|
||
"""
|
||
统一日期格式
|
||
|
||
Args:
|
||
date_str: 输入日期字符串(支持 YYYY-MM-DD 或 YYYYMMDD)
|
||
output_format: 输出格式
|
||
|
||
Returns:
|
||
str: 格式化后的日期字符串
|
||
"""
|
||
# 尝试解析多种格式
|
||
for fmt in ["%Y-%m-%d", "%Y%m%d", "%Y/%m/%d"]:
|
||
try:
|
||
dt = datetime.strptime(date_str, fmt)
|
||
return dt.strftime(output_format)
|
||
except ValueError:
|
||
continue
|
||
raise ValueError(f"无法解析日期格式: {date_str}")
|
||
|
||
|
||
def get_date_range(
|
||
start_date: Optional[str] = None,
|
||
end_date: Optional[str] = None,
|
||
lookback_days: int = 365,
|
||
) -> tuple[str, str]:
|
||
"""
|
||
获取日期范围
|
||
|
||
Args:
|
||
start_date: 开始日期,None则根据lookback_days计算
|
||
end_date: 结束日期,None则使用今天
|
||
lookback_days: 回溯天数
|
||
|
||
Returns:
|
||
tuple: (start_date, end_date) 格式为 YYYY-MM-DD
|
||
"""
|
||
if end_date is None:
|
||
end = datetime.now()
|
||
else:
|
||
end = datetime.strptime(format_date(end_date), "%Y-%m-%d")
|
||
|
||
if start_date is None:
|
||
start = end - timedelta(days=lookback_days)
|
||
else:
|
||
start = datetime.strptime(format_date(start_date), "%Y-%m-%d")
|
||
|
||
return start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")
|
||
|
||
|
||
def calculate_cagr(
|
||
nav_series: pd.Series,
|
||
method: str = "natural_days",
|
||
) -> float:
|
||
"""
|
||
计算年化收益率(CAGR)
|
||
|
||
Args:
|
||
nav_series: 净值序列(index=日期)
|
||
method: 'natural_days' 或 'trading_days'
|
||
|
||
Returns:
|
||
float: CAGR值
|
||
"""
|
||
# 去除NaN值
|
||
nav_series = nav_series.dropna()
|
||
|
||
if len(nav_series) < 2:
|
||
return 0.0
|
||
|
||
start_val = nav_series.iloc[0]
|
||
end_val = nav_series.iloc[-1]
|
||
|
||
# 检查起始值是否有效
|
||
if pd.isna(start_val) or pd.isna(end_val) or start_val <= 0:
|
||
return 0.0
|
||
|
||
total_return = end_val / start_val
|
||
|
||
if method == "natural_days":
|
||
days = (nav_series.index[-1] - nav_series.index[0]).days
|
||
years = days / 365.0
|
||
elif method == "trading_days":
|
||
years = len(nav_series) / 252.0
|
||
else:
|
||
raise ValueError(f"不支持的CAGR计算方式: {method}")
|
||
|
||
if years <= 0:
|
||
return 0.0
|
||
|
||
return total_return ** (1 / years) - 1
|
||
|
||
|
||
def calculate_max_drawdown(nav_series: pd.Series) -> tuple[float, datetime, datetime]:
|
||
"""
|
||
计算最大回撤
|
||
|
||
Returns:
|
||
tuple: (最大回撤比例, 回撤起始日, 回撤结束日)
|
||
"""
|
||
cummax = nav_series.cummax()
|
||
drawdown = (nav_series - cummax) / cummax
|
||
|
||
max_dd = drawdown.min()
|
||
end_idx = drawdown.idxmin()
|
||
start_idx = nav_series[:end_idx].idxmax()
|
||
|
||
return max_dd, start_idx, end_idx
|
||
|
||
|
||
def calculate_sharpe(
|
||
returns: pd.Series,
|
||
rf: float = 0.0,
|
||
periods: int = 252,
|
||
) -> float:
|
||
"""
|
||
计算年化夏普比率
|
||
|
||
Args:
|
||
returns: 日收益率序列
|
||
rf: 无风险利率(年化)
|
||
periods: 年化系数
|
||
|
||
Returns:
|
||
float: 夏普比率
|
||
"""
|
||
excess_returns = returns - rf / periods
|
||
if excess_returns.std() == 0:
|
||
return 0.0
|
||
return excess_returns.mean() / excess_returns.std() * np.sqrt(periods)
|
||
|
||
|
||
def resample_data(
|
||
df: pd.DataFrame,
|
||
timeframe: str,
|
||
time_col: str = "time",
|
||
) -> pd.DataFrame:
|
||
"""
|
||
对数据进行重采样
|
||
|
||
Args:
|
||
df: 原始数据
|
||
timeframe: 目标周期 ('1D', '1W', '1M', '1Y')
|
||
time_col: 时间列名
|
||
|
||
Returns:
|
||
DataFrame: 重采样后的数据
|
||
"""
|
||
timeframe_map = {
|
||
"1D": "D",
|
||
"1W": "W",
|
||
"1M": "M",
|
||
"3M": "3M",
|
||
"1Y": "Y",
|
||
}
|
||
|
||
if timeframe not in timeframe_map:
|
||
return df
|
||
|
||
df = df.copy()
|
||
if time_col in df.columns:
|
||
df[time_col] = pd.to_datetime(df[time_col])
|
||
df.set_index(time_col, inplace=True)
|
||
|
||
rule = timeframe_map[timeframe]
|
||
|
||
resampled = (
|
||
df.resample(rule)
|
||
.agg(
|
||
{
|
||
"open": "first",
|
||
"high": "max",
|
||
"low": "min",
|
||
"close": "last",
|
||
"volume": "sum",
|
||
}
|
||
)
|
||
.dropna()
|
||
)
|
||
|
||
return resampled.reset_index()
|
||
|
||
|
||
def safe_divide(a: float, b: float, default: float = 0.0) -> float:
|
||
"""安全除法,避免除以0"""
|
||
return a / b if b != 0 else default
|
||
|
||
|
||
def truncate_string(s: str, max_length: int = 50, suffix: str = "...") -> str:
|
||
"""截断字符串"""
|
||
if len(s) <= max_length:
|
||
return s
|
||
return s[: max_length - len(suffix)] + suffix
|