Files
etf/archive/config/settings.py
aszerW c905230a40 refactor(archive): move unused modules to archive/
Archive legacy framework and utility modules that are no longer
referenced by the active core (datasource/ and rotation/):

- framework/ -> archive/framework/
- framework_v2/ -> archive/framework_v2/
- strategies/ -> archive/strategies/
- config/ -> archive/config/
- visualization/ -> archive/visualization/
- scripts/ -> archive/scripts/
- tests/ -> archive/tests/
- run_rotation.py, run_us_rotation.py -> archive/single_files/
- compare_*.py, test_api_dates.py -> archive/single_files/
2026-06-03 23:41:46 +08:00

84 lines
1.8 KiB
Python

"""
配置管理模块
从环境变量读取配置信息
"""
import os
from typing import Dict, List, Optional
# 默认基准
DEFAULT_BENCHMARK_CODE = "000300.SH"
DEFAULT_BENCHMARK_NAME = "沪深300指数"
# 默认代码名称映射
DEFAULT_CODE_NAME_MAP = {
"000300.SH": "沪深300",
"000905.SH": "中证500",
"000852.SH": "中证1000",
}
def get_dingtalk_config() -> Dict[str, str]:
"""
获取钉钉机器人配置(第一个群)
Returns:
dict: 包含 webhook 和 secret
"""
webhook = os.getenv("DINGTALK_WEBHOOK", "")
secret = os.getenv("DINGTALK_SECRET", "")
return {
"webhook": webhook,
"secret": secret,
}
def get_all_dingtalk_configs() -> List[Dict[str, str]]:
"""
获取所有钉钉机器人配置(支持多群)
环境变量格式:
群1: DINGTALK_WEBHOOK_1 + DINGTALK_SECRET_1
群2: DINGTALK_WEBHOOK_2 + DINGTALK_SECRET_2
群3: DINGTALK_WEBHOOK_3 + DINGTALK_SECRET_3
...
Returns:
list: 配置列表,每项包含 webhook 和 secret
"""
configs = []
# 从 i=1 开始读取编号配置
i = 1
while True:
webhook = os.getenv(f"DINGTALK_WEBHOOK_{i}", "")
secret = os.getenv(f"DINGTALK_SECRET_{i}", "")
if not webhook:
break
configs.append({
"webhook": webhook,
"secret": secret,
})
i += 1
return configs
def get_db_config() -> Dict[str, str]:
"""
获取数据库配置
Returns:
dict: 数据库配置
"""
return {
"host": os.getenv("DB_HOST", "localhost"),
"port": os.getenv("DB_PORT", "5432"),
"user": os.getenv("DB_USER", ""),
"password": os.getenv("DB_PASSWORD", ""),
"database": os.getenv("DB_NAME", ""),
}