Files
etf/archive/legacy_scripts/scripts/run_cci_screener.py
aszerW 1fca536c95 refactor: 归档旧代码,保留新框架结构
归档内容:
- core/ (数据源、因子计算、通用工具) → archive/legacy_core/
- strategies/rotation/engine.py, portfolio.py, report.py → archive/legacy_core/
- scripts/ (run_rotation, daily_scheduler) → archive/legacy_scripts/
- examples/ → archive/legacy_examples/
- tests/ (实验、对比测试) → archive/legacy_tests/
- 单独文件 (fetch_*.py, 动量.py, 全球市场.py等) → archive/single_files/

保留新结构:
- framework/ (抽象接口)
- strategies/shared/ (定制组件)
- strategies/rotation/strategy.py (新策略)
- 外层配置: .env, .dockerignore, build-and-push.sh, hk_ecs.pem, README.md, requirements.txt
- Docker相关: Dockerfile, Dockerfile_base, docker-compose.yml

更新README反映新框架架构
2026-05-11 23:34:23 +08:00

86 lines
2.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""
CCI技术指标筛选器入口
用法:
python scripts/run_cci_screener.py
python scripts/run_cci_screener.py --config config/strategies/cci.yaml
python scripts/run_cci_screener.py --schedule # 定时模式
"""
import sys
import time
import yaml
import argparse
import schedule
from pathlib import Path
from datetime import datetime
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from strategies.screener.cci import CCIScreener
def load_config(config_path: str) -> dict:
"""加载配置文件"""
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def run_screening(config: dict):
"""执行一次筛选"""
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始CCI筛选...")
screener = CCIScreener(config)
signals = screener.run_screening()
return signals
def main():
parser = argparse.ArgumentParser(description="CCI技术指标筛选")
parser.add_argument(
"--config",
type=str,
default="config/strategies/cci.yaml",
help="配置文件路径",
)
parser.add_argument(
"--schedule",
action="store_true",
help="启用定时模式",
)
args = parser.parse_args()
# 加载配置
config = load_config(args.config)
print("=" * 60)
print(" CCI技术指标筛选器")
print("=" * 60)
print(f"\n配置文件: {args.config}")
print(f"日线周期: {config.get('day_period', 14)}")
print(f"周线周期: {config.get('week_period', 14)}")
print(f"筛选阈值: {config.get('threshold', -100)}")
print(f"数据源: {config.get('data_source', 'postgresql')}")
if args.schedule:
# 定时模式
schedule_time = config.get("schedule_time", "19:00")
print(f"\n定时模式已启用,每天 {schedule_time} 执行")
print("按 Ctrl+C 停止\n")
schedule.every().day.at(schedule_time).do(run_screening, config)
while True:
schedule.run_pending()
time.sleep(1)
else:
# 单次执行
run_screening(config)
if __name__ == "__main__":
main()