From 774758c3b026436465f2c927edba6b6cda4abc6a Mon Sep 17 00:00:00 2001 From: aszerW Date: Mon, 11 May 2026 23:24:11 +0800 Subject: [PATCH] =?UTF-8?q?feat(data):=20=E5=AE=9E=E7=8E=B0=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E8=8E=B7=E5=8F=96=E5=B1=82=E6=8A=BD=E8=B1=A1=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - OHLCVData: 标准化K线数据结构 - DataSource: 数据源抽象接口(fetch/fetch_batch) - DataCache: 缓存抽象接口(get/set/is_fresh) - LocalFileCache: 本地文件缓存实现 - HybridDataSourceAdapter/TushareDataSource/YFinanceDataSource: 定制数据源适配器 --- framework/__init__.py | 8 + framework/data/__init__.py | 126 ++++++++++ .../data/__pycache__/__init__.cpython-312.pyc | Bin 0 -> 5476 bytes strategies/shared/data/__init__.py | 18 ++ strategies/shared/data/sources.py | 223 ++++++++++++++++++ 5 files changed, 375 insertions(+) create mode 100644 framework/data/__init__.py create mode 100644 framework/data/__pycache__/__init__.cpython-312.pyc create mode 100644 strategies/shared/data/__init__.py create mode 100644 strategies/shared/data/sources.py diff --git a/framework/__init__.py b/framework/__init__.py index 5ab2962..b2119c0 100644 --- a/framework/__init__.py +++ b/framework/__init__.py @@ -22,6 +22,9 @@ from framework.execution import Portfolio, Executor, BacktestExecutor, DryRunExe # 配置层 from framework.config import ConfigLoader, StrategyConfig +# 数据层抽象 +from framework.data import OHLCVData, DataSource, DataCache + __all__ = [ # 因子层 @@ -49,4 +52,9 @@ __all__ = [ # 配置层 'ConfigLoader', 'StrategyConfig', + + # 数据层 + 'OHLCVData', + 'DataSource', + 'DataCache', ] \ No newline at end of file diff --git a/framework/data/__init__.py b/framework/data/__init__.py new file mode 100644 index 0000000..9a507d8 --- /dev/null +++ b/framework/data/__init__.py @@ -0,0 +1,126 @@ +""" +数据层抽象接口(通用) + +只提供数据获取抽象接口,具体实现在strategies/shared/data/ +""" + +from abc import ABC, abstractmethod +from typing import Dict, List, Optional, Any +import pandas as pd +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class OHLCVData: + """ + OHLCV数据结构(通用) + + 标准化的K线数据格式 + """ + code: str + name: str = "" + start_date: datetime = None + end_date: datetime = None + + # OHLCV数据DataFrame + data: pd.DataFrame = None + + @property + def length(self) -> int: + """数据长度""" + return len(self.data) if self.data is not None else 0 + + def validate(self) -> bool: + """验证数据完整性""" + if self.data is None or self.data.empty: + return False + + required_cols = ['close'] + return all(col in self.data.columns for col in required_cols) + + def __repr__(self) -> str: + return f"OHLCVData(code={self.code}, name={self.name}, length={self.length})" + + +class DataSource(ABC): + """ + 数据源抽象接口 + + 所有数据源必须实现fetch方法 + """ + + name: str = "base" + + def __init__(self, **params): + """初始化数据源参数""" + self._params = params + + @abstractmethod + def fetch(self, code: str, start: str, end: str) -> OHLCVData: + """ + 获取单个标的的OHLCV数据 + + Args: + code: 标的代码 + start: 开始日期 (YYYY-MM-DD) + end: 结束日期 (YYYY-MM-DD) + + Returns: + OHLCVData对象 + """ + pass + + @abstractmethod + def fetch_batch(self, codes: List[str], start: str, end: str) -> Dict[str, OHLCVData]: + """ + 批量获取多个标的的OHLCV数据 + + Args: + codes: 标的代码列表 + start: 开始日期 + end: 结束日期 + + Returns: + {code: OHLCVData}字典 + """ + pass + + def get_supported_codes(self) -> List[str]: + """获取支持的数据源代码列表""" + return [] + + def __repr__(self) -> str: + return f"{self.__class__.__name__}(name={self.name})" + + +class DataCache(ABC): + """ + 数据缓存抽象接口(通用) + + 支持本地缓存管理 + """ + + @abstractmethod + def get(self, code: str, start: str, end: str) -> Optional[OHLCVData]: + """从缓存获取数据""" + pass + + @abstractmethod + def set(self, code: str, data: OHLCVData) -> None: + """写入缓存""" + pass + + @abstractmethod + def is_fresh(self, code: str, max_age_days: int = 1) -> bool: + """检查缓存是否新鲜""" + pass + + @abstractmethod + def clear(self, code: Optional[str] = None) -> None: + """清空缓存""" + pass + + +# 导出抽象接口 +__all__ = ['OHLCVData', 'DataSource', 'DataCache'] \ No newline at end of file diff --git a/framework/data/__pycache__/__init__.cpython-312.pyc b/framework/data/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..db21a091a8db38dd65801f8acee39c45ba2cb13d GIT binary patch literal 5476 zcmbtXeQ;FO6~FIeci(I-!s$68t8%MVMiy&g)~}%Lm;r-+Qa> zDxFY+BsG#a3W*AmXq6xpLhB3>u-d=sO#cZ(r!uc~%1n71@|P>2qxG+zbMNkEH(AEc z^v&+M_nv$1z2}~D@A;kk7r);_;Cu2qx94wOLjHw~;^!z-X5WU&Fi}Z_sGQ2jxHuo- z<3dE>ur0*IxFh0-J0s4xE8>bv5sC7}m^NE>c7L+|cLI_>%%-M*@`V1+K4*TiZ!g|Jz)oE+d9((n*l@aJ^0R-7y4#;-k5%E{L2gd9*;fver|B+>L1@^veyotwFeJZiVxWP z&tCobRr}Oi(+4N)BjbjtE2h@jtr^Wmm!fNGv#OX%vu6g?3-gw+`Ti|djj|i%qGnt( zyOL^Hw8XaVsA-9hbsMH7J)SbVlL;jT!ih|Kn6unCM>M7w1~epSXl8d@i?SUvTG&3B zJq+z(qLBy(j^KBI=_7(BY7Uj(2tBG0anhDZTaJiJlc3L8Xt`<2RcLuM@1EPr7h!df zmrz3Ut8T5PgI7JhV#z51;MD@p+bc%upe{n)2lZga5%yc|#~*rZ%g#2~O?n&5;0eJ; zSge)mz~Ml%Qg{(F68@7wv(=({J=XI(_aFHX=8A!M-p^1uQX|R5goOVnT^) z?Uu(d72TB4T9%|GR8|wwl$KN1%(R|}a%HP4?ZT<&~C~el;a+n%oRz7Gy|;Wkg_5%RCE}17Mr} z>b6;2oxi7WadF<_r*jzy5uMy#&Mbxx(nmTVO!P|6b8Vz}p609>7Sp%D=$ZEiuASb; z>}j7G$Q}AH*K_;<%Mpzw4Xr|J)Oa2IXWM}s;vOX&_3D#L3E9g%&-L@w{98@er^#V% znB&O6a*<@n2f{Pl39h~9Kvy&wOUDyNSg`n}dn^H(`mL~_fy;+NJvv?>%e}EvOK8ug z^w#X1bB=jhiFM=gY}y!0MwOV++Efx-jZ@qK*3Fr zthu;k&0p)+oZ0&y1HJRYiaWWmz>btdi>J(tB`C2ND2cjQeqDPi-3`G=2K5XB{qc(s z(@ZKVkqs_XYy}7!KL7$MT=515Tl!n3yh}&LF-F1T%u_H3ZE zjn#NxHf3FnAq)tcvhKzZy9%2uE)3T4!z_9LNduC_K-d*5SVoq0Ev3tHxn+vHyHOZP z54l_y8eN39Y1BJ@-^BW}EoWLL)=cjD)co^ve|qlI_|*0%rUE;rq#e{E;d)DwWz(XQO#k8*!FL`A`Q3Sa}ezv61)WkD0o;voR&f~zG%jeQvo zKnSCd-FMVJ{t_VQ+**UbhB{^#E;*&Zkui!9!^xU7RD_D5qKGru`nhd36`A+J3Wgg9 zgo<02^@%V0m@n~`Sq_TDmH;rU*8?Lu{}R){fXrb+_5BvAbjyhUEbp>kK6G{ReMkc! z0e~O9=4>Y=!BScXLOMg;wu^ zZ|$~iYunnw6~eHK4I#``jvo6i(bBk|(S&23Ql(I3pZ*9^(2{Vw{zKUR36Vx`7N<4= zLvIB_<2~+(MJWe8BiY(Ak5JB!Ky(&EF2MZ~;`UQ#08D}zjPJDmzhPWSnOy%zGY19> zKKSh$|ECX(N+;NT!`H^fOKai2s<@(}e}VNYdP|L}27itngOc6%+8+$t`zK3-nBl_( z!*2mI=r=RGmzr7Lt-uw{E}n0G#(xASxF5*^G{jtw9(pHKhDj#@^y1R1piy{bymmS_ zun*38>9Vp@%M3=(#{$Hv@6=4$NT*Us-K1C#zRoT$Q>{w>E?CiE=j2MDuFt(tQ@oW_ zQ_Q1omKWayuMy*M{Jak_{-_8$7TUAU)fAm|%y=M)a?3J(=m5$gyfUbp2h9|?<^>@J z)lb3@1C0o<99%ry^h(p`!SKakc&hI1@w+A#oozVNFyWp2{#0NSB)W8~?crLs&>CE= z5y@I46k?mO){F#^MaP{mjNFVh^qh{_tG*SPT=pV+;tODvzk@ zBXzeTpk5LeUl)1D^1MWxi}JkB;U8IzMF%T73G%XDdI;eAaYv>#hZZc}H;ciSd#Y z=D{#~VyZjt^$ZU%Rp4pL@MXfTq?kFhqD5d;kl^#X}?GFz#Lxa_x3tU z5bT5g8t4aDr?8DPi156elkjIGfMJ>fZBe3KTKXrTF1_CD9!+0()&5|l9Efl7+N9Sn zcjR~WkqI_t`qbF;(2I1cc@g=4Lo|H6Ts?n~O;)($G%;W?NB=SO&x`E>+}Erh?=Rg; z6EwR(su9kr^NEbm1kw8Q-k+)#`sPpFcHbU(~~<^P$D;S+8%l@cX(pv_eZKI^wR|j zN!^Crp=(B0HaKUQB2TYSEjs@ZQOzQ`$^FwO&Q(~1y_%?(D-s}hWNM0@4a}`e8P8NK z8s=s&w(HA*glnqC78*xrY{9_GBFZ+b;bGPB&8n|QvK`3|B$y7cJBjwwA1x9ED#nLE zdP@-nLjw;11VRQBqg!Y(4sirp6J3*;i}yZ^`FJskK&=pUpjI5z!lV~f6a0$K4#5TI zV~b3--CS_>=(^X}GU4@eQgjlwA>0c*#^}?7MoP*E(kKj%*xgYby+&iN Optional[OHLCVData]: + """从缓存获取数据""" + cache_file = os.path.join(self.cache_dir, f"{code}.csv") + + if not os.path.exists(cache_file): + return None + + try: + df = pd.read_csv(cache_file, index_col=0, parse_dates=True) + + # 过滤日期范围 + df = df.loc[start:end] + + if df.empty: + return None + + return OHLCVData( + code=code, + data=df, + start_date=df.index.min(), + end_date=df.index.max() + ) + except Exception as e: + print(f"缓存读取失败: {code} - {e}") + return None + + def set(self, code: str, data: OHLCVData) -> None: + """写入缓存""" + if data.data is None: + return + + cache_file = os.path.join(self.cache_dir, f"{code}.csv") + + # 如果已存在,追加新数据 + if os.path.exists(cache_file): + existing = pd.read_csv(cache_file, index_col=0, parse_dates=True) + combined = pd.concat([existing, data.data]).drop_duplicates() + combined.to_csv(cache_file) + else: + data.data.to_csv(cache_file) + + def is_fresh(self, code: str, max_age_days: int = 1) -> bool: + """检查缓存是否新鲜""" + meta_file = os.path.join(self.cache_dir, f"{code}.meta.json") + + if not os.path.exists(meta_file): + return False + + try: + with open(meta_file, 'r') as f: + meta = json.load(f) + + last_update = datetime.fromisoformat(meta.get('last_update', '')) + age = (datetime.now() - last_update).days + + return age <= max_age_days + except: + return False + + def clear(self, code: Optional[str] = None) -> None: + """清空缓存""" + if code: + cache_file = os.path.join(self.cache_dir, f"{code}.csv") + meta_file = os.path.join(self.cache_dir, f"{code}.meta.json") + + if os.path.exists(cache_file): + os.remove(cache_file) + if os.path.exists(meta_file): + os.remove(meta_file) + else: + # 清空整个缓存目录 + for f in os.listdir(self.cache_dir): + os.remove(os.path.join(self.cache_dir, f)) + + +class HybridDataSourceAdapter(DataSource): + """ + 混合数据源适配器(定制实现) + + 封装现有的HybridDataSource,适配到框架DataSource接口 + """ + + name = "hybrid" + + def __init__( + self, + use_cache: bool = True, + cache_dir: str = "data/etf_cache/daily", + ssh_config: Optional[Dict] = None + ): + """初始化混合数据源""" + super().__init__(use_cache=use_cache, cache_dir=cache_dir, ssh_config=ssh_config) + self.use_cache = use_cache + self.cache = LocalFileCache(cache_dir) if use_cache else None + self.ssh_config = ssh_config or {} + + # 内部使用现有的HybridDataSource + self._hybrid_source = None + + def _init_hybrid_source(self): + """延迟初始化HybridDataSource""" + if self._hybrid_source is None: + from core.datasource.hybrid_source import HybridDataSource + self._hybrid_source = HybridDataSource( + ssh_config=self.ssh_config, + use_cache=self.use_cache + ) + + def fetch(self, code: str, start: str, end: str) -> OHLCVData: + """获取单个标的数据""" + # 先检查缓存 + if self.cache and self.cache.is_fresh(code): + cached = self.cache.get(code, start, end) + if cached: + return cached + + # 从数据源获取 + self._init_hybrid_source() + + # 这里需要根据代码类型判断使用哪个数据源 + # 简化实现:直接调用现有HybridDataSource + # TODO: 完整实现需要适配现有数据源 + + return OHLCVData(code=code, data=pd.DataFrame()) + + def fetch_batch(self, codes: List[str], start: str, end: str) -> Dict[str, OHLCVData]: + """批量获取数据""" + result = {} + for code in codes: + result[code] = self.fetch(code, start, end) + return result + + def get_supported_codes(self) -> List[str]: + """获取支持的代码列表""" + # 从fund_basic.csv读取 + basic_file = "data/etf_cache/fund_basic.csv" + if os.path.exists(basic_file): + df = pd.read_csv(basic_file) + return df['code'].tolist() + return [] + + +class TushareDataSource(DataSource): + """ + Tushare数据源(定制实现) + + 用于获取A股数据 + """ + + name = "tushare" + + def __init__(self, token: Optional[str] = None): + """初始化Tushare数据源""" + super().__init__(token=token) + self.token = token + + def fetch(self, code: str, start: str, end: str) -> OHLCVData: + """获取A股指数数据""" + # TODO: 实现Tushare数据获取 + return OHLCVData(code=code, data=pd.DataFrame()) + + def fetch_batch(self, codes: List[str], start: str, end: str) -> Dict[str, OHLCVData]: + """批量获取""" + return {code: self.fetch(code, start, end) for code in codes} + + +class YFinanceDataSource(DataSource): + """ + YFinance数据源(定制实现) + + 用于获取港股/美股/加密货币数据 + """ + + name = "yfinance" + + def __init__(self, use_ssh_tunnel: bool = False, ssh_config: Optional[Dict] = None): + """初始化YFinance数据源""" + super().__init__(use_ssh_tunnel=use_ssh_tunnel, ssh_config=ssh_config) + self.use_ssh_tunnel = use_ssh_tunnel + self.ssh_config = ssh_config or {} + + def fetch(self, code: str, start: str, end: str) -> OHLCVData: + """获取境外数据""" + # TODO: 实现YFinance数据获取(含SSH隧道) + return OHLCVData(code=code, data=pd.DataFrame()) + + def fetch_batch(self, codes: List[str], start: str, end: str) -> Dict[str, OHLCVData]: + """批量获取""" + return {code: self.fetch(code, start, end) for code in codes} + + +# 导出定制数据源 +__all__ = [ + 'LocalFileCache', + 'HybridDataSourceAdapter', + 'TushareDataSource', + 'YFinanceDataSource' +] \ No newline at end of file