Files
etf/archive/legacy_tests/tests/fetch_3033.py
aszerW 1fca536c95 refactor: 归档旧代码,保留新框架结构
归档内容:
- core/ (数据源、因子计算、通用工具) → archive/legacy_core/
- strategies/rotation/engine.py, portfolio.py, report.py → archive/legacy_core/
- scripts/ (run_rotation, daily_scheduler) → archive/legacy_scripts/
- examples/ → archive/legacy_examples/
- tests/ (实验、对比测试) → archive/legacy_tests/
- 单独文件 (fetch_*.py, 动量.py, 全球市场.py等) → archive/single_files/

保留新结构:
- framework/ (抽象接口)
- strategies/shared/ (定制组件)
- strategies/rotation/strategy.py (新策略)
- 外层配置: .env, .dockerignore, build-and-push.sh, hk_ecs.pem, README.md, requirements.txt
- Docker相关: Dockerfile, Dockerfile_base, docker-compose.yml

更新README反映新框架架构
2026-05-11 23:34:23 +08:00

101 lines
3.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
获取 3033.HK 最新10条数据
对比复权和不复权价格
"""
import yfinance as yf
import pandas as pd
from typing import Optional
import os
def fetch_3033(period: str = "10d", proxy: Optional[str] = None) -> None:
"""
获取 3033.HK 数据,对比复权和不复权价格
Args:
period: 获取周期默认10天
proxy: 代理地址,如 "socks5://127.0.0.1:1080"
"""
# 设置代理
if proxy:
os.environ['HTTP_PROXY'] = proxy
os.environ['HTTPS_PROXY'] = proxy
print(f"使用代理: {proxy}")
code = "3033.HK"
ticker = yf.Ticker(code)
print(f"\n{'='*80}")
print(f"获取 {code} 最新数据")
print(f"{'='*80}\n")
# 1. 不复权价格 (auto_adjust=False)
print("【1. 不复权价格】(auto_adjust=False)")
print("-" * 80)
data_raw = ticker.history(period=period, auto_adjust=False)
if not data_raw.empty:
print(f"{'日期':<12} {'开盘':>10} {'收盘':>10} {'最高':>10} {'最低':>10} {'成交量':>12}")
print("-" * 80)
for date, row in data_raw.tail(10).iterrows():
date_str = date.strftime('%Y-%m-%d')
print(f"{date_str:<12} {row['Open']:>10.3f} {row['Close']:>10.3f} {row['High']:>10.3f} {row['Low']:>10.3f} {row['Volume']:>12.0f}")
print(f"\n最新收盘价: {data_raw['Close'].iloc[-1]:.3f}")
else:
print("无数据")
# 2. 前复权价格 (auto_adjust=True默认)
print(f"\n{'='*80}")
print("【2. 前复权价格】(auto_adjust=True默认)")
print("-" * 80)
data_adj = ticker.history(period=period, auto_adjust=True)
if not data_adj.empty:
print(f"{'日期':<12} {'开盘':>10} {'收盘':>10} {'最高':>10} {'最低':>10} {'成交量':>12}")
print("-" * 80)
for date, row in data_adj.tail(10).iterrows():
date_str = date.strftime('%Y-%m-%d')
print(f"{date_str:<12} {row['Open']:>10.3f} {row['Close']:>10.3f} {row['High']:>10.3f} {row['Low']:>10.3f} {row['Volume']:>12.0f}")
print(f"\n最新收盘价: {data_adj['Close'].iloc[-1]:.3f}")
else:
print("无数据")
# 3. 对比
if not data_raw.empty and not data_adj.empty:
print(f"\n{'='*80}")
print("【3. 价格对比】")
print("-" * 80)
latest_raw = data_raw['Close'].iloc[-1]
latest_adj = data_adj['Close'].iloc[-1]
print(f"不复权最新价: {latest_raw:.3f}")
print(f"前复权最新价: {latest_adj:.3f}")
print(f"差异: {abs(latest_raw - latest_adj):.3f} ({abs(latest_raw - latest_adj)/latest_raw*100:.2f}%)")
print(f"\n{'='*80}")
def main():
"""主函数"""
# 检查是否需要使用代理
# 如果 Clash 开启,使用 Clash HTTP 代理
# 如果 SSH 隧道开启,使用 SOCKS5 代理
# 使用 Clash HTTP 代理
proxy = "http://127.0.0.1:7890"
# 如果 Clash 不可用,尝试 SOCKS5 代理SSH 隧道)
# proxy = "socks5://127.0.0.1:1080"
try:
fetch_3033(period="10d", proxy=proxy)
except Exception as e:
print(f"使用代理 {proxy} 失败: {e}")
print("\n尝试不使用代理...")
try:
fetch_3033(period="10d", proxy=None)
except Exception as e2:
print(f"无代理也失败: {e2}")
if __name__ == "__main__":
main()