Files
etf/scripts/run_cci_screener.py
aszerW 988c2335fb chore(config): 添加环境变量示例及.gitignore更新
- 新增 .env.example,包含 Tushare API、钉钉机器人和PostgreSQL数据库配置模板
- 更新.gitignore,忽略本地配置文件如 .env.local 和 config_local.py
- 添加对报表文件命名规则的支持,保留示例文件不忽略
- 删除废弃的 chart.py 及相关图表模块代码
- 新增 config/settings.py,实现从环境变量读取配置的统一接口
- 设置数据目录及缓存目录,确保目录存在,提高配置管理规范性
2026-03-18 23:33:40 +08:00

86 lines
2.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""
CCI技术指标筛选器入口
用法:
python scripts/run_cci_screener.py
python scripts/run_cci_screener.py --config config/strategies/cci.yaml
python scripts/run_cci_screener.py --schedule # 定时模式
"""
import sys
import time
import yaml
import argparse
import schedule
from pathlib import Path
from datetime import datetime
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from strategies.screener.cci import CCIScreener
def load_config(config_path: str) -> dict:
"""加载配置文件"""
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def run_screening(config: dict):
"""执行一次筛选"""
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始CCI筛选...")
screener = CCIScreener(config)
signals = screener.run_screening()
return signals
def main():
parser = argparse.ArgumentParser(description="CCI技术指标筛选")
parser.add_argument(
"--config",
type=str,
default="config/strategies/cci.yaml",
help="配置文件路径",
)
parser.add_argument(
"--schedule",
action="store_true",
help="启用定时模式",
)
args = parser.parse_args()
# 加载配置
config = load_config(args.config)
print("=" * 60)
print(" CCI技术指标筛选器")
print("=" * 60)
print(f"\n配置文件: {args.config}")
print(f"日线周期: {config.get('day_period', 14)}")
print(f"周线周期: {config.get('week_period', 14)}")
print(f"筛选阈值: {config.get('threshold', -100)}")
print(f"数据源: {config.get('data_source', 'postgresql')}")
if args.schedule:
# 定时模式
schedule_time = config.get("schedule_time", "19:00")
print(f"\n定时模式已启用,每天 {schedule_time} 执行")
print("按 Ctrl+C 停止\n")
schedule.every().day.at(schedule_time).do(run_screening, config)
while True:
schedule.run_pending()
time.sleep(1)
else:
# 单次执行
run_screening(config)
if __name__ == "__main__":
main()