diff --git a/chart.py b/chart.py index 53f1a54..e978f2c 100644 --- a/chart.py +++ b/chart.py @@ -9,115 +9,6 @@ from lightweight_charts import Chart import random from datetime import datetime -horizontal_lines = {} - - -def get_fixed_color_based_on_period(period): - # 使用周期值作为随机种子,确保相同周期生成相同颜色 - random.seed(period) - - # 生成随机的RGB值 - r = random.randint(0, 255) - g = random.randint(0, 255) - b = random.randint(0, 255) - - # 将RGB值转换为十六进制颜色代码 - color_code = "#{:02x}{:02x}{:02x}".format(r, g, b) - - # 重置随机种子,避免影响其他随机操作 - random.seed(None) - - return color_code - - -def add_ema(df, chart, period: int = 50): - name = f"EMA_{period}" - df[name] = ta.EMA(df["close"], timeperiod=period) - color = get_fixed_color_based_on_period(period) - line = chart.create_line( - name, color=color, width=2, price_label=False, price_line=False - ) - line.set(df[["time", name]]) - - -def add_cci(df, chart, period: int = 14, height: float = 0.1, position: str = "bottom"): - cci = ta.CCI(df["high"], df["low"], df["close"], timeperiod=period) - df[f"CCI_{period}"] = cci - cci_chart = chart.create_subchart( - position=position, width=1, height=height, sync=True - ) - cci_chart.legend( - visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman" - ) - cci_chart.time_scale(visible=False) - cci_line = cci_chart.create_line(name=f"CCI_{period}", color="#FF0000", width=2) - cci_line.set(df[["time", f"CCI_{period}"]]) - df = df[["time"]].copy() - df["h"] = 100 - df["l"] = -100 - cci_line = cci_chart.create_line( - name="h", - color="#D4C21C", - width=1, - style="dashed", - price_label=False, - price_line=False, - ) - cci_line.set(df[["time", "h"]]) - cci_line = cci_chart.create_line( - name="l", - color="#D4C21C", - width=1, - style="dashed", - price_label=False, - price_line=False, - ) - cci_line.set(df[["time", "l"]]) - - -def add_macd( - df, - chart, - fastperiod: int = 12, - slowperiod: int = 26, - signalperiod: int = 9, - height: float = 0.1, - position: str = "bottom", -): - macd, signal, hist = ta.MACD( - df["close"], - fastperiod=fastperiod, - slowperiod=slowperiod, - signalperiod=signalperiod, - ) - df["DIF"] = macd - df["DEA"] = signal - macd_name = f"MACD_{fastperiod}_{slowperiod}_{signalperiod}" - df[macd_name] = hist * 2 - macd_chart = chart.create_subchart( - position=position, width=1, height=height, sync=True - ) - macd_chart.legend( - visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman" - ) - macd_chart.time_scale(visible=False) - - histogram = macd_chart.create_histogram(name=macd_name) - hist_data = df[["time", macd_name]].copy() - hist_data["color"] = hist_data[macd_name].apply( - lambda x: "#00FF00" if x < 0 else "#ff0000" # 绿色 : 红色 - ) - histogram.set(hist_data) - - macd_line = macd_chart.create_line( - name="DIF", color="#2962FF", width=2, price_label=False, price_line=False - ) - macd_line.set(df[["time", "DIF"]]) - signal_line = macd_chart.create_line( - name="DEA", color="#FF0000", width=2, price_label=False, price_line=False - ) - signal_line.set(df[["time", "DEA"]]) - def TD(dataframe: pd.DataFrame): close = dataframe["close"].to_list() @@ -136,77 +27,6 @@ def TD(dataframe: pd.DataFrame): return td -def add_TD(df, chart): - df["TD"] = TD(df) - td_line = chart.create_line( - name="TD", - color="rgba(0, 0, 0, 0)", # 透明色,不在图表上显示线条 - width=0, - price_line=False, - price_label=False, - price_scale_id="td_scale", - ) - td_line.precision(0) - td_line.set(df[["time", "TD"]]) - TDs = df[["time", "TD"]].to_dict(orient="records") - markers = [] - for item in TDs: - if item["TD"] in [9, 13]: - markers.append( - { - "time": item["time"].strftime("%Y-%m-%d"), - "position": "above", - "shape": "arrow_down", - "color": "#00FF00", - "text": f"{item['TD']}", - } - ) - elif item["TD"] in [-9, -13]: - markers.append( - { - "time": item["time"].strftime("%Y-%m-%d"), - "position": "below", - "shape": "arrow_up", - "color": "#FF0000", - "text": f"{item['TD']}", - } - ) - - chart.marker_list(markers) - - -def add_buy_sell_signal_markers(df, chart): - - if "buy" not in df.columns or "sell" not in df.columns: - return - - markers = [] - signals = df[["time", "buy", "sell"]].to_dict(orient="records") - for item in signals: - if item["buy"] == 1: - markers.append( - { - "time": item["time"].strftime("%Y-%m-%d"), - "position": "below", - "shape": "arrow_up", - "color": "#00FF00", - "text": f"B", - } - ) - elif item["sell"] == 1: - markers.append( - { - "time": item["time"].strftime("%Y-%m-%d"), - "position": "above", - "shape": "arrow_down", - "color": "#FF0000", - "text": f"S", - } - ) - - chart.marker_list(markers) - - def resample_data(df: pd.DataFrame, timeframe: str) -> pd.DataFrame: """ 对日线数据进行重采样 @@ -296,105 +116,330 @@ def POC(df, bins: int = 50): return poc -def on_range_change_poc(chart, bars_before, bars_after): - df = chart.candle_data - if df is None or df.empty: - return +def get_fixed_color_based_on_period(num: int): + # 使用周期值作为随机种子,确保相同周期生成相同颜色 + random.seed(num) - total_bars = len(df) - # TODO: k线拉到最早会报错 - if bars_after < 0: - start_idx = max(0, int(bars_before)) - end_idx = total_bars - else: - start_idx = int(bars_before) - end_idx = max(0, int(total_bars - bars_after)) - df_range = df.iloc[start_idx:end_idx] - # logger.info( - # f"Calculating POC for bars {start_idx} to {end_idx}, total_bars={total_bars}, bars_before={bars_before}, bars_after={bars_after}" - # ) - # logger.info(f"df_range_len: {len(df_range)}") - poc = POC(df_range) - poc_line_name = "POC" - if poc_line_name in horizontal_lines: - horizontal_lines[poc_line_name].delete() - # 添加新的 POC 水平线 - latest_close = df_range["close"].iloc[-1] - profit = (latest_close - poc) / poc * 100 - poc_line = chart.horizontal_line( - price=poc, color="#FF0000", width=4, style="solid", text=f"{poc:.2f}" - ) - chart.legend( - visible=True, - text=f"POC: {poc:.2f}, poc_range_profit%: {profit:.1f}%, tf_cnt: {len(df_range)}", - ) + # 生成随机的RGB值 + r = random.randint(0, 255) + g = random.randint(0, 255) + b = random.randint(0, 255) - horizontal_lines[poc_line_name] = poc_line + # 将RGB值转换为十六进制颜色代码 + color_code = "#{:02x}{:02x}{:02x}".format(r, g, b) + + # 重置随机种子,避免影响其他随机操作 + random.seed(None) + + return color_code -def check_df(df: pd.DataFrame): - # basic type check - if not isinstance(df, pd.DataFrame): - raise TypeError("df must be a pandas DataFrame") +class QuantChart: - required_cols = ["time", "open", "high", "low", "close", "volume"] - missing = [c for c in required_cols if c not in df.columns] - if missing: - raise ValueError( - f"Missing required columns: {missing}. Required: {required_cols}" + def __init__(self): + self.horizontal_lines = {} + self.legend_data = {} + self.visible_latest_close = None + + def update_legend(self, chart, key, text): + self.legend_data[key] = text + sorted_dict = dict(sorted(self.legend_data.items())) + full_text = ", ".join(sorted_dict.values()) + chart.legend(visible=True, text=full_text) + + def add_ema(self, df, chart, period: int = 50): + name = f"EMA_{period}" + df[name] = ta.EMA(df["close"], timeperiod=period) + color = get_fixed_color_based_on_period(num=period) + line = chart.create_line( + name, color=color, width=2, price_label=False, price_line=False ) + line.set(df[["time", name]]) - # time column must be datetime.datetime or pd.Timestamp (or a datetime64 dtype) - time_series = df["time"] - if not ( - pd.api.types.is_datetime64_any_dtype(time_series) - or time_series.apply(lambda x: isinstance(x, (pd.Timestamp, datetime))).all() + def add_cci( + self, df, chart, period: int = 14, height: float = 0.1, position: str = "bottom" ): - raise TypeError( - "Column 'time' must contain datetime values (python datetime.datetime or pandas.Timestamp) " - "or be a datetime64 dtype." + cci = ta.CCI(df["high"], df["low"], df["close"], timeperiod=period) + df[f"CCI_{period}"] = cci + cci_chart = chart.create_subchart( + position=position, width=1, height=height, sync=True + ) + cci_chart.legend( + visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman" + ) + cci_chart.time_scale(visible=False) + cci_line = cci_chart.create_line(name=f"CCI_{period}", color="#FF0000", width=2) + cci_line.set(df[["time", f"CCI_{period}"]]) + df = df[["time"]].copy() + df["h"] = 100 + df["l"] = -100 + cci_line = cci_chart.create_line( + name="h", + color="#D4C21C", + width=1, + style="dashed", + price_label=False, + price_line=False, + ) + cci_line.set(df[["time", "h"]]) + cci_line = cci_chart.create_line( + name="l", + color="#D4C21C", + width=1, + style="dashed", + price_label=False, + price_line=False, + ) + cci_line.set(df[["time", "l"]]) + + def add_macd( + self, + df, + chart, + fastperiod: int = 12, + slowperiod: int = 26, + signalperiod: int = 9, + height: float = 0.1, + position: str = "bottom", + ): + macd, signal, hist = ta.MACD( + df["close"], + fastperiod=fastperiod, + slowperiod=slowperiod, + signalperiod=signalperiod, + ) + df["DIF"] = macd + df["DEA"] = signal + macd_name = f"MACD_{fastperiod}_{slowperiod}_{signalperiod}" + df[macd_name] = hist * 2 + macd_chart = chart.create_subchart( + position=position, width=1, height=height, sync=True + ) + macd_chart.legend( + visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman" + ) + macd_chart.time_scale(visible=False) + + histogram = macd_chart.create_histogram(name=macd_name) + hist_data = df[["time", macd_name]].copy() + hist_data["color"] = hist_data[macd_name].apply( + lambda x: "#00FF00" if x < 0 else "#ff0000" # 绿色 : 红色 + ) + histogram.set(hist_data) + + macd_line = macd_chart.create_line( + name="DIF", color="#2962FF", width=2, price_label=False, price_line=False + ) + macd_line.set(df[["time", "DIF"]]) + signal_line = macd_chart.create_line( + name="DEA", color="#FF0000", width=2, price_label=False, price_line=False + ) + signal_line.set(df[["time", "DEA"]]) + + def add_TD(self, df, chart): + df["TD"] = TD(df) + td_line = chart.create_line( + name="TD", + color="rgba(0, 0, 0, 0)", # 透明色,不在图表上显示线条 + width=0, + price_line=False, + price_label=False, + price_scale_id="td_scale", + ) + td_line.precision(0) + td_line.set(df[["time", "TD"]]) + TDs = df[["time", "TD"]].to_dict(orient="records") + markers = [] + for item in TDs: + if item["TD"] in [9, 13]: + markers.append( + { + "time": item["time"].strftime("%Y-%m-%d"), + "position": "above", + "shape": "arrow_down", + "color": "#00FF00", + "text": f"{item['TD']}", + } + ) + elif item["TD"] in [-9, -13]: + markers.append( + { + "time": item["time"].strftime("%Y-%m-%d"), + "position": "below", + "shape": "arrow_up", + "color": "#FF0000", + "text": f"{item['TD']}", + } + ) + + chart.marker_list(markers) + + def add_buy_sell_signal_markers(self, df, chart): + + if "buy" not in df.columns or "sell" not in df.columns: + return + + markers = [] + signals = df[["time", "buy", "sell"]].to_dict(orient="records") + for item in signals: + if item["buy"] == 1: + markers.append( + { + "time": item["time"].strftime("%Y-%m-%d"), + "position": "below", + "shape": "arrow_up", + "color": "#00FF00", + "text": f"B", + } + ) + elif item["sell"] == 1: + markers.append( + { + "time": item["time"].strftime("%Y-%m-%d"), + "position": "above", + "shape": "arrow_down", + "color": "#FF0000", + "text": f"S", + } + ) + + chart.marker_list(markers) + + def on_range_change_poc(self, chart, bars_before, bars_after): + df = chart.candle_data + if df is None or df.empty: + return + + total_bars = len(df) + # TODO: k线拉到最早会报错 + if bars_after < 0: + start_idx = max(0, int(bars_before)) + end_idx = total_bars + else: + start_idx = int(bars_before) + end_idx = max(0, int(total_bars - bars_after)) + df_range = df.iloc[start_idx:end_idx] + # logger.info( + # f"Calculating POC for bars {start_idx} to {end_idx}, total_bars={total_bars}, bars_before={bars_before}, bars_after={bars_after}" + # ) + # logger.info(f"df_range_len: {len(df_range)}") + poc = POC(df_range) + poc_line_name = "POC" + if poc_line_name in self.horizontal_lines: + self.horizontal_lines[poc_line_name].delete() + # 添加新的 POC 水平线 + self.visible_latest_close = df_range["close"].iloc[-1] + profit = (self.visible_latest_close - poc) / poc * 100 + poc_line = chart.horizontal_line( + price=poc, color="#FF0000", width=4, style="solid", text=f"{poc:.2f}" + ) + # chart.legend( + # visible=True, + # text=f"POC: {poc:.2f}, poc_range_profit%: {profit:.1f}%, visible_tf_cnt: {len(df_range)}", + # ) + legend_text = f"POC: {poc:.2f}, poc_range_profit%: {profit:.1f}%, visible_tf_cnt: {len(df_range)}" + self.update_legend(chart=chart, key=poc_line_name, text=legend_text) + + self.horizontal_lines[poc_line_name] = poc_line + + def check_df(self, df: pd.DataFrame): + # basic type check + if not isinstance(df, pd.DataFrame): + raise TypeError("df must be a pandas DataFrame") + + required_cols = ["time", "open", "high", "low", "close", "volume"] + missing = [c for c in required_cols if c not in df.columns] + if missing: + raise ValueError( + f"Missing required columns: {missing}. Required: {required_cols}" + ) + + # time column must be datetime.datetime or pd.Timestamp (or a datetime64 dtype) + time_series = df["time"] + if not ( + pd.api.types.is_datetime64_any_dtype(time_series) + or time_series.apply( + lambda x: isinstance(x, (pd.Timestamp, datetime)) + ).all() + ): + raise TypeError( + "Column 'time' must contain datetime values (python datetime.datetime or pandas.Timestamp) " + "or be a datetime64 dtype." + ) + + # other columns must be numeric (int or float) + for col in ["open", "high", "low", "close", "volume"]: + if not pd.api.types.is_numeric_dtype(df[col]): + raise TypeError(f"Column '{col}' must be numeric (int or float)") + + def setup_crosshair_tracking(self, chart): + chart.run_script( + f""" + {chart.id}.chart.subscribeCrosshairMove((param) => {{ + if (!param.point) return; + const price = {chart.id}.series.coordinateToPrice(param.point.y); + if (price !== null) {{ + window.callbackFunction(`crosshair_price_~_${{price}}`); + }} + }}) + """ ) - # other columns must be numeric (int or float) - for col in ["open", "high", "low", "close", "volume"]: - if not pd.api.types.is_numeric_dtype(df[col]): - raise TypeError(f"Column '{col}' must be numeric (int or float)") + # 注册回调处理 + def on_crosshair_price(price_str): + # 实时获取鼠标位置y轴的价格 + price = float(price_str) + if self.visible_latest_close is not None: + profit = (self.visible_latest_close - price) / price * 100 + self.update_legend( + chart=chart, + key="Crosshair Price Profit%", + text=f"Crosshair Price Profit%: {profit:.2f}%", + ) + chart.win.handlers["crosshair_price"] = lambda p: on_crosshair_price(p) -def plot_chart( - df, symbol: str, name: str, timeframe: str, init_visible_num_bars: int = 90 -): - # 校验数据是否满足 - check_df(df) + def plot_chart( + self, + df, + symbol: str, + name: str, + timeframe: str, + init_visible_num_bars: int = 90, + ): + # 校验数据是否满足 + self.check_df(df) - chart = Chart(toolbox=True, inner_height=0.8, maximize=True) - chart.topbar.textbox("symbol", symbol) - chart.topbar.textbox("name", name) - chart.topbar.textbox("timeframe", timeframe) - chart.legend( - visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman" - ) - chart.set(df) + chart = Chart(toolbox=True, inner_height=0.8, maximize=True) + chart.topbar.textbox("symbol", symbol) + chart.topbar.textbox("name", name) + chart.topbar.textbox("timeframe", timeframe) + chart.legend( + visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman" + ) + chart.set(df) - # 设置刚进入chart时的可见k线数量范围 - end_time = df["time"].iloc[-1] - start_time = df["time"].iloc[-init_visible_num_bars] - chart.set_visible_range(start_time, end_time) + # 设置刚进入chart时的可见k线数量范围 + end_time = df["time"].iloc[-1] + start_time = df["time"].iloc[-init_visible_num_bars] + chart.set_visible_range(start_time, end_time) - # 设置每次放缩k线范围时的回调函数计算实时计算poc - chart.events.range_change += on_range_change_poc + # 设置每次放缩k线范围时的回调函数计算实时计算poc + chart.events.range_change += self.on_range_change_poc - # 添加技术指标 - # add_ema(df, chart, period=10) - # add_ema(df, chart, period=20) - add_ema(df, chart, period=30) - # add_ema(df, chart, period=60) - add_macd(df, chart) - add_cci(df, chart, period=14) - add_TD(df, chart) - add_buy_sell_signal_markers(df, chart) + self.setup_crosshair_tracking(chart) - chart.show(block=True) + # 添加技术指标 + # add_ema(df, chart, period=10) + # add_ema(df, chart, period=20) + self.add_ema(df, chart, period=30) + # add_ema(df, chart, period=60) + self.add_macd(df, chart) + self.add_cci(df, chart, period=14) + self.add_TD(df, chart) + self.add_buy_sell_signal_markers(df, chart) + + chart.show(block=True) if __name__ == "__main__":