""" akshare数据源实现(用于CCI筛选等场景) """ import time import pandas as pd import akshare as ak from typing import Optional from .base import DataSource class AkshareDataSource(DataSource): """基于akshare的数据源""" def __init__(self, delay: float = 3.0): """ 初始化akshare数据源 Args: delay: 每次请求间隔秒数(避免触发限流) """ self.delay = delay def fetch_ohlcv( self, code: str, start_date: str, end_date: str, fields: Optional[list] = None, ) -> pd.DataFrame: """ 获取指数历史数据(使用akshare的东方财富接口) Args: code: 指数代码,如 '000300'(不带后缀) start_date: 起始日期 'YYYY-MM-DD' end_date: 结束日期 'YYYY-MM-DD' Returns: DataFrame,包含 date, open, high, low, close, volume """ # 转换日期格式 sd = start_date.replace("-", "") ed = end_date.replace("-", "") # 去除后缀 symbol = code.replace(".SH", "").replace(".SZ", "") time.sleep(self.delay) try: df = ak.index_zh_a_hist( symbol=symbol, period="daily", start_date=sd, end_date=ed, ) except Exception as e: raise RuntimeError(f"akshare查询失败 [{code}]: {e}") if df is None or df.empty: raise ValueError(f"akshare返回空数据: {code}") # 统一列名 df = df.rename( columns={ "日期": "date", "开盘": "open", "最高": "high", "最低": "low", "收盘": "close", "成交量": "volume", } ) df["date"] = pd.to_datetime(df["date"]) df["code"] = code df = df[["date", "code", "open", "high", "low", "close", "volume"]] df = df.sort_values("date").reset_index(drop=True) return df def fetch_multiple( self, codes: list, start_date: str, end_date: str, ) -> tuple[pd.DataFrame, list]: """批量获取数据""" df_list = [] failed = [] for code in codes: try: df = self.fetch_ohlcv(code, start_date, end_date) df_list.append(df[["date", "code", "close"]].copy()) except Exception as e: print(f" ⚠ 跳过 {code}: {e}") failed.append(code) if not df_list: raise RuntimeError("所有数据获取失败") all_df = pd.concat(df_list, ignore_index=True) data = all_df.pivot(index="date", columns="code", values="close") data = data.sort_index() return data, failed def fetch_index_list(self) -> pd.DataFrame: """ 获取所有A股指数列表 Returns: DataFrame with index info """ sources = ["沪深重要指数", "上证系列指数", "深证系列指数", "中证系列指数"] df_list = [] for source in sources: try: df = ak.stock_zh_index_spot_em(symbol=source) df["source"] = source df_list.append(df) except Exception as e: print(f" ⚠ 获取 {source} 失败: {e}") return pd.concat(df_list, ignore_index=True) if df_list else pd.DataFrame()