import pandas as pd from db_config import DatabaseManager, DatabaseConfig from loguru import logger import talib as ta import numpy as np from lightweight_charts import Chart import random horizontal_lines = {} def get_fixed_color_based_on_period(period): # 使用周期值作为随机种子,确保相同周期生成相同颜色 random.seed(period) # 生成随机的RGB值 r = random.randint(0, 255) g = random.randint(0, 255) b = random.randint(0, 255) # 将RGB值转换为十六进制颜色代码 color_code = "#{:02x}{:02x}{:02x}".format(r, g, b) # 重置随机种子,避免影响其他随机操作 random.seed(None) return color_code def add_ema(df, chart, period: int = 50): name = f"EMA {period}" df[name] = ta.EMA(df["close"], timeperiod=period) color = get_fixed_color_based_on_period(period) line = chart.create_line( name, color=color, width=2, price_label=False, price_line=False ) line.set(df[["time", name]]) def add_cci(df, chart, period: int = 14, height: float = 0.1, position: str = "bottom"): cci = ta.CCI(df["high"], df["low"], df["close"], timeperiod=period) df[f"CCI_{period}"] = cci cci_chart = chart.create_subchart( position=position, width=1, height=height, sync=True ) cci_chart.legend( visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman" ) cci_chart.time_scale(visible=False) cci_line = cci_chart.create_line(name=f"CCI_{period}", color="#FF0000", width=2) cci_line.set(df[["time", f"CCI_{period}"]]) df = df[["time"]].copy() df["h"] = 100 df["l"] = -100 cci_line = cci_chart.create_line( name="h", color="#D4C21C", width=1, style="dashed", price_label=False, price_line=False, ) cci_line.set(df[["time", "h"]]) cci_line = cci_chart.create_line( name="l", color="#D4C21C", width=1, style="dashed", price_label=False, price_line=False, ) cci_line.set(df[["time", "l"]]) def add_macd( df, chart, fastperiod: int = 12, slowperiod: int = 26, signalperiod: int = 9, height: float = 0.1, position: str = "bottom", ): macd, signal, hist = ta.MACD( df["close"], fastperiod=fastperiod, slowperiod=slowperiod, signalperiod=signalperiod, ) df["DIF"] = macd df["DEA"] = signal df["MACD"] = hist * 2 macd_chart = chart.create_subchart( position=position, width=1, height=height, sync=True ) macd_chart.legend( visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman" ) macd_chart.time_scale(visible=False) histogram = macd_chart.create_histogram(name="MACD") hist_data = df[["time", "MACD"]].copy() hist_data["color"] = hist_data["MACD"].apply( lambda x: "#00FF00" if x < 0 else "#ff0000" # 绿色 : 红色 ) histogram.set(hist_data) macd_line = macd_chart.create_line( name="DIF", color="#2962FF", width=2, price_label=False, price_line=False ) macd_line.set(df[["time", "DIF"]]) signal_line = macd_chart.create_line( name="DEA", color="#FF0000", width=2, price_label=False, price_line=False ) signal_line.set(df[["time", "DEA"]]) def TD(dataframe: pd.DataFrame): close = dataframe["close"].to_list() td = [0, 0, 0, 0] up = 0 down = 0 for i in range(4, len(close)): if close[i] > close[i - 4]: up += 1 down = 0 td.append(up) else: down -= 1 up = 0 td.append(down) return td def add_TD(df, chart): df["TD"] = TD(df) td_line = chart.create_line( name="TD", color="rgba(0, 0, 0, 0)", # 透明色,不在图表上显示线条 width=0, price_line=False, price_label=False, price_scale_id="td_scale", ) td_line.precision(0) td_line.set(df[["time", "TD"]]) TDs = df[["time", "TD"]].to_dict(orient="records") markers = [] for item in TDs: if item["TD"] in [9, 13]: markers.append( { "time": item["time"].strftime("%Y-%m-%d"), "position": "above", "shape": "arrow_down", "color": "#00FF00", "text": f"{item['TD']}", } ) elif item["TD"] in [-9, -13]: markers.append( { "time": item["time"].strftime("%Y-%m-%d"), "position": "below", "shape": "arrow_up", "color": "#FF0000", "text": f"{item['TD']}", } ) chart.marker_list(markers) def add_buy_sell_signal_markers(df, chart): if "buy" not in df.columns or "sell" not in df.columns: return markers = [] signals = df[["time", "buy", "sell"]].to_dict(orient="records") for item in signals: if item["buy"] == 1: markers.append( { "time": item["time"].strftime("%Y-%m-%d"), "position": "below", "shape": "arrow_up", "color": "#00FF00", "text": f"B", } ) elif item["sell"] == 1: markers.append( { "time": item["time"].strftime("%Y-%m-%d"), "position": "above", "shape": "arrow_down", "color": "#FF0000", "text": f"S", } ) chart.marker_list(markers) def get_kline(code: str) -> list: """ 获取所有指数代码 :return: """ env = "online" env = "daily" db_config = DatabaseConfig(env=env) logger.info(f"数据库连接: {db_config.connection_string}") db_manager = DatabaseManager(db_config) sql = f"SELECT date as time, open, high, low, close, volume FROM public.index_kline where code='{code}' order by date;" res = db_manager.execute_query(sql) data_list = [dict(item) for item in res] df = pd.DataFrame(data_list) df["time"] = pd.to_datetime(df["time"]) num_cols = ["open", "high", "low", "close", "volume"] for col in num_cols: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) return df def resample_data(df: pd.DataFrame, timeframe: str) -> pd.DataFrame: """ 对日线数据进行重采样 :param df: 原始日线数据(time 作为索引) :param timeframe: 目标周期 ('1D', '1W', '1M', '3M', '1Y') :return: 重采样后的 DataFrame """ # 映射周期到 pandas resample 规则 timeframe_map = { "1D": "D", # 日线 "1W": "W", # 周线 "1M": "M", # 月线 "3M": "3M", # 季线 "1Y": "Y", # 年线 } if timeframe not in timeframe_map: return df df = df.set_index("time") rule = timeframe_map[timeframe] # 对 OHLCV 数据进行重采样 resampled = ( df.resample(rule) .agg( { "open": "first", # 开盘价取第一个 "high": "max", # 最高价取最大值 "low": "min", # 最低价取最小值 "close": "last", # 收盘价取最后一个 "volume": "sum", # 成交量求和 } ) .dropna() ) # 重置索引,将 time 变回列 resampled = resampled.reset_index() return resampled def POC(df, bins: int = 50): """ 计算价格分布的成交量峰值位置(POC,Point Of Control) 参数: df: 包含 'low', 'high', 'volume' 三列的 DataFrame(每行代表一根 K 线) bins: 将价格区间划分为多少个小区间(价格桶),默认为 50 返回: poc: 代表成交量最大的价格区间的中点价格 """ # 当前数据的最低价和最高价,用于构建价格区间 low, high = df["low"].min(), df["high"].max() # 将整个价格区间等分为 bins 个小区间,需要 bins+1 个边界点 price_ranges = np.linspace(low, high, bins + 1) # 初始化每个价格区间累积的成交量数组,长度为 bins(区间个数) volume_per_price = np.zeros(bins) # 遍历每一根 K 线,将该 K 线的成交量按覆盖的价格区间平均分配 for i in range(len(df)): high = df["high"].iloc[i] low = df["low"].iloc[i] volume = df["volume"].iloc[i] # 找到当前 K 线覆盖的价格边界(注意使用闭区间判断) # price_ranges 表示边界点,若 price_ranges[j] 在 [low, high] 范围内,则第 j 个边界被覆盖 price_coverage = (price_ranges >= low) & (price_ranges <= high) covered_bins = np.where(price_coverage)[0] if len(covered_bins) > 0: # 如果覆盖了多个边界点,则这些边界之间形成了若干完整的区间 # 将该 K 线的成交量平均分配到这些被覆盖的区间上 # 注意:covered_bins 里是边界索引,最后一个边界索引对应的区间索引需小于 bins volume_per_bin = volume / len(covered_bins) for bin_idx in covered_bins: if bin_idx < bins: volume_per_price[bin_idx] += volume_per_bin # 找到成交量最大的区间索引 idx = np.argmax(volume_per_price) # 以该区间的两个边界的中点作为 POC 值 poc = (price_ranges[idx] + price_ranges[idx + 1]) / 2 return poc def on_range_change_poc(chart, bars_before, bars_after): df = chart.candle_data if df is None or df.empty: return total_bars = len(df) # TODO: k线拉到最早会报错 if bars_after < 0: start_idx = max(0, int(bars_before)) end_idx = total_bars else: start_idx = int(bars_before) end_idx = max(0, int(total_bars - bars_after)) df_range = df.iloc[start_idx:end_idx] # logger.info( # f"Calculating POC for bars {start_idx} to {end_idx}, total_bars={total_bars}, bars_before={bars_before}, bars_after={bars_after}" # ) # logger.info(f"df_range_len: {len(df_range)}") poc = POC(df_range) poc_line_name = "POC" if poc_line_name in horizontal_lines: horizontal_lines[poc_line_name].delete() # 添加新的 POC 水平线 latest_close = df_range["close"].iloc[-1] profit = (latest_close - poc) / poc * 100 poc_line = chart.horizontal_line( price=poc, color="#FF0000", width=4, style="solid", text=f"{poc:.2f}" ) chart.legend( visible=True, text=f"POC: {poc:.2f}, poc_range_profit%: {profit:.1f}%, tf_cnt: {len(df_range)}", ) horizontal_lines[poc_line_name] = poc_line def plot_chart(df, symbol: str, name: str, timeframe: str): """ df: DataFrame time, open, high, low, close, volume, [buy, sell]可选 buy=1 买入信号, sell=1 卖出信号 """ chart = Chart(toolbox=True, inner_height=0.8, maximize=True) chart.topbar.textbox("symbol", symbol) chart.topbar.textbox("name", name) chart.topbar.textbox("timeframe", timeframe) chart.legend( visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman" ) chart.set(df) chart.events.range_change += on_range_change_poc add_ema(df, chart, period=10) add_ema(df, chart, period=20) add_ema(df, chart, period=30) add_ema(df, chart, period=60) add_macd(df, chart) add_cci(df, chart, period=14) add_TD(df, chart) add_buy_sell_signal_markers(df, chart) chart.show(block=True) if __name__ == "__main__": symbol = "399998" timeframe = "1W" df = pd.read_csv( "/Users/aszer/Documents/vscode/etf/data/index_all_stock.csv", encoding="utf-8-sig", ) name = df.loc[df["代码"] == symbol, "名称"].values[0] df = get_kline(code=symbol) df = resample_data(df, timeframe) # df['buy'] = df['time'].apply(lambda x: 1 if pd.to_datetime(x).day % 2 == 1 else 0) # df['sell'] = df['time'].apply(lambda x: 1 if pd.to_datetime(x).day % 2 == 0 else 0) plot_chart(df=df, symbol=symbol, name=name, timeframe=timeframe)