修改为类;添加update_legend函数;添加实时鼠标y轴价格获取并计算该价格和可见范围内的最新收盘价之间的利润Crosshair Price Profit%

This commit is contained in:
2025-10-19 11:59:05 +08:00
parent bcfffe4181
commit e363332827

569
chart.py
View File

@@ -9,115 +9,6 @@ from lightweight_charts import Chart
import random
from datetime import datetime
horizontal_lines = {}
def get_fixed_color_based_on_period(period):
# 使用周期值作为随机种子,确保相同周期生成相同颜色
random.seed(period)
# 生成随机的RGB值
r = random.randint(0, 255)
g = random.randint(0, 255)
b = random.randint(0, 255)
# 将RGB值转换为十六进制颜色代码
color_code = "#{:02x}{:02x}{:02x}".format(r, g, b)
# 重置随机种子,避免影响其他随机操作
random.seed(None)
return color_code
def add_ema(df, chart, period: int = 50):
name = f"EMA_{period}"
df[name] = ta.EMA(df["close"], timeperiod=period)
color = get_fixed_color_based_on_period(period)
line = chart.create_line(
name, color=color, width=2, price_label=False, price_line=False
)
line.set(df[["time", name]])
def add_cci(df, chart, period: int = 14, height: float = 0.1, position: str = "bottom"):
cci = ta.CCI(df["high"], df["low"], df["close"], timeperiod=period)
df[f"CCI_{period}"] = cci
cci_chart = chart.create_subchart(
position=position, width=1, height=height, sync=True
)
cci_chart.legend(
visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman"
)
cci_chart.time_scale(visible=False)
cci_line = cci_chart.create_line(name=f"CCI_{period}", color="#FF0000", width=2)
cci_line.set(df[["time", f"CCI_{period}"]])
df = df[["time"]].copy()
df["h"] = 100
df["l"] = -100
cci_line = cci_chart.create_line(
name="h",
color="#D4C21C",
width=1,
style="dashed",
price_label=False,
price_line=False,
)
cci_line.set(df[["time", "h"]])
cci_line = cci_chart.create_line(
name="l",
color="#D4C21C",
width=1,
style="dashed",
price_label=False,
price_line=False,
)
cci_line.set(df[["time", "l"]])
def add_macd(
df,
chart,
fastperiod: int = 12,
slowperiod: int = 26,
signalperiod: int = 9,
height: float = 0.1,
position: str = "bottom",
):
macd, signal, hist = ta.MACD(
df["close"],
fastperiod=fastperiod,
slowperiod=slowperiod,
signalperiod=signalperiod,
)
df["DIF"] = macd
df["DEA"] = signal
macd_name = f"MACD_{fastperiod}_{slowperiod}_{signalperiod}"
df[macd_name] = hist * 2
macd_chart = chart.create_subchart(
position=position, width=1, height=height, sync=True
)
macd_chart.legend(
visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman"
)
macd_chart.time_scale(visible=False)
histogram = macd_chart.create_histogram(name=macd_name)
hist_data = df[["time", macd_name]].copy()
hist_data["color"] = hist_data[macd_name].apply(
lambda x: "#00FF00" if x < 0 else "#ff0000" # 绿色 : 红色
)
histogram.set(hist_data)
macd_line = macd_chart.create_line(
name="DIF", color="#2962FF", width=2, price_label=False, price_line=False
)
macd_line.set(df[["time", "DIF"]])
signal_line = macd_chart.create_line(
name="DEA", color="#FF0000", width=2, price_label=False, price_line=False
)
signal_line.set(df[["time", "DEA"]])
def TD(dataframe: pd.DataFrame):
close = dataframe["close"].to_list()
@@ -136,77 +27,6 @@ def TD(dataframe: pd.DataFrame):
return td
def add_TD(df, chart):
df["TD"] = TD(df)
td_line = chart.create_line(
name="TD",
color="rgba(0, 0, 0, 0)", # 透明色,不在图表上显示线条
width=0,
price_line=False,
price_label=False,
price_scale_id="td_scale",
)
td_line.precision(0)
td_line.set(df[["time", "TD"]])
TDs = df[["time", "TD"]].to_dict(orient="records")
markers = []
for item in TDs:
if item["TD"] in [9, 13]:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "above",
"shape": "arrow_down",
"color": "#00FF00",
"text": f"{item['TD']}",
}
)
elif item["TD"] in [-9, -13]:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "below",
"shape": "arrow_up",
"color": "#FF0000",
"text": f"{item['TD']}",
}
)
chart.marker_list(markers)
def add_buy_sell_signal_markers(df, chart):
if "buy" not in df.columns or "sell" not in df.columns:
return
markers = []
signals = df[["time", "buy", "sell"]].to_dict(orient="records")
for item in signals:
if item["buy"] == 1:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "below",
"shape": "arrow_up",
"color": "#00FF00",
"text": f"B",
}
)
elif item["sell"] == 1:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "above",
"shape": "arrow_down",
"color": "#FF0000",
"text": f"S",
}
)
chart.marker_list(markers)
def resample_data(df: pd.DataFrame, timeframe: str) -> pd.DataFrame:
"""
对日线数据进行重采样
@@ -296,105 +116,330 @@ def POC(df, bins: int = 50):
return poc
def on_range_change_poc(chart, bars_before, bars_after):
df = chart.candle_data
if df is None or df.empty:
return
def get_fixed_color_based_on_period(num: int):
# 使用周期值作为随机种子,确保相同周期生成相同颜色
random.seed(num)
total_bars = len(df)
# TODO: k线拉到最早会报错
if bars_after < 0:
start_idx = max(0, int(bars_before))
end_idx = total_bars
else:
start_idx = int(bars_before)
end_idx = max(0, int(total_bars - bars_after))
df_range = df.iloc[start_idx:end_idx]
# logger.info(
# f"Calculating POC for bars {start_idx} to {end_idx}, total_bars={total_bars}, bars_before={bars_before}, bars_after={bars_after}"
# )
# logger.info(f"df_range_len: {len(df_range)}")
poc = POC(df_range)
poc_line_name = "POC"
if poc_line_name in horizontal_lines:
horizontal_lines[poc_line_name].delete()
# 添加新的 POC 水平线
latest_close = df_range["close"].iloc[-1]
profit = (latest_close - poc) / poc * 100
poc_line = chart.horizontal_line(
price=poc, color="#FF0000", width=4, style="solid", text=f"{poc:.2f}"
)
chart.legend(
visible=True,
text=f"POC: {poc:.2f}, poc_range_profit%: {profit:.1f}%, tf_cnt: {len(df_range)}",
)
# 生成随机的RGB值
r = random.randint(0, 255)
g = random.randint(0, 255)
b = random.randint(0, 255)
horizontal_lines[poc_line_name] = poc_line
# 将RGB值转换为十六进制颜色代码
color_code = "#{:02x}{:02x}{:02x}".format(r, g, b)
# 重置随机种子,避免影响其他随机操作
random.seed(None)
return color_code
def check_df(df: pd.DataFrame):
# basic type check
if not isinstance(df, pd.DataFrame):
raise TypeError("df must be a pandas DataFrame")
class QuantChart:
required_cols = ["time", "open", "high", "low", "close", "volume"]
missing = [c for c in required_cols if c not in df.columns]
if missing:
raise ValueError(
f"Missing required columns: {missing}. Required: {required_cols}"
def __init__(self):
self.horizontal_lines = {}
self.legend_data = {}
self.visible_latest_close = None
def update_legend(self, chart, key, text):
self.legend_data[key] = text
sorted_dict = dict(sorted(self.legend_data.items()))
full_text = ", ".join(sorted_dict.values())
chart.legend(visible=True, text=full_text)
def add_ema(self, df, chart, period: int = 50):
name = f"EMA_{period}"
df[name] = ta.EMA(df["close"], timeperiod=period)
color = get_fixed_color_based_on_period(num=period)
line = chart.create_line(
name, color=color, width=2, price_label=False, price_line=False
)
line.set(df[["time", name]])
# time column must be datetime.datetime or pd.Timestamp (or a datetime64 dtype)
time_series = df["time"]
if not (
pd.api.types.is_datetime64_any_dtype(time_series)
or time_series.apply(lambda x: isinstance(x, (pd.Timestamp, datetime))).all()
def add_cci(
self, df, chart, period: int = 14, height: float = 0.1, position: str = "bottom"
):
raise TypeError(
"Column 'time' must contain datetime values (python datetime.datetime or pandas.Timestamp) "
"or be a datetime64 dtype."
cci = ta.CCI(df["high"], df["low"], df["close"], timeperiod=period)
df[f"CCI_{period}"] = cci
cci_chart = chart.create_subchart(
position=position, width=1, height=height, sync=True
)
cci_chart.legend(
visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman"
)
cci_chart.time_scale(visible=False)
cci_line = cci_chart.create_line(name=f"CCI_{period}", color="#FF0000", width=2)
cci_line.set(df[["time", f"CCI_{period}"]])
df = df[["time"]].copy()
df["h"] = 100
df["l"] = -100
cci_line = cci_chart.create_line(
name="h",
color="#D4C21C",
width=1,
style="dashed",
price_label=False,
price_line=False,
)
cci_line.set(df[["time", "h"]])
cci_line = cci_chart.create_line(
name="l",
color="#D4C21C",
width=1,
style="dashed",
price_label=False,
price_line=False,
)
cci_line.set(df[["time", "l"]])
def add_macd(
self,
df,
chart,
fastperiod: int = 12,
slowperiod: int = 26,
signalperiod: int = 9,
height: float = 0.1,
position: str = "bottom",
):
macd, signal, hist = ta.MACD(
df["close"],
fastperiod=fastperiod,
slowperiod=slowperiod,
signalperiod=signalperiod,
)
df["DIF"] = macd
df["DEA"] = signal
macd_name = f"MACD_{fastperiod}_{slowperiod}_{signalperiod}"
df[macd_name] = hist * 2
macd_chart = chart.create_subchart(
position=position, width=1, height=height, sync=True
)
macd_chart.legend(
visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman"
)
macd_chart.time_scale(visible=False)
histogram = macd_chart.create_histogram(name=macd_name)
hist_data = df[["time", macd_name]].copy()
hist_data["color"] = hist_data[macd_name].apply(
lambda x: "#00FF00" if x < 0 else "#ff0000" # 绿色 : 红色
)
histogram.set(hist_data)
macd_line = macd_chart.create_line(
name="DIF", color="#2962FF", width=2, price_label=False, price_line=False
)
macd_line.set(df[["time", "DIF"]])
signal_line = macd_chart.create_line(
name="DEA", color="#FF0000", width=2, price_label=False, price_line=False
)
signal_line.set(df[["time", "DEA"]])
def add_TD(self, df, chart):
df["TD"] = TD(df)
td_line = chart.create_line(
name="TD",
color="rgba(0, 0, 0, 0)", # 透明色,不在图表上显示线条
width=0,
price_line=False,
price_label=False,
price_scale_id="td_scale",
)
td_line.precision(0)
td_line.set(df[["time", "TD"]])
TDs = df[["time", "TD"]].to_dict(orient="records")
markers = []
for item in TDs:
if item["TD"] in [9, 13]:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "above",
"shape": "arrow_down",
"color": "#00FF00",
"text": f"{item['TD']}",
}
)
elif item["TD"] in [-9, -13]:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "below",
"shape": "arrow_up",
"color": "#FF0000",
"text": f"{item['TD']}",
}
)
chart.marker_list(markers)
def add_buy_sell_signal_markers(self, df, chart):
if "buy" not in df.columns or "sell" not in df.columns:
return
markers = []
signals = df[["time", "buy", "sell"]].to_dict(orient="records")
for item in signals:
if item["buy"] == 1:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "below",
"shape": "arrow_up",
"color": "#00FF00",
"text": f"B",
}
)
elif item["sell"] == 1:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "above",
"shape": "arrow_down",
"color": "#FF0000",
"text": f"S",
}
)
chart.marker_list(markers)
def on_range_change_poc(self, chart, bars_before, bars_after):
df = chart.candle_data
if df is None or df.empty:
return
total_bars = len(df)
# TODO: k线拉到最早会报错
if bars_after < 0:
start_idx = max(0, int(bars_before))
end_idx = total_bars
else:
start_idx = int(bars_before)
end_idx = max(0, int(total_bars - bars_after))
df_range = df.iloc[start_idx:end_idx]
# logger.info(
# f"Calculating POC for bars {start_idx} to {end_idx}, total_bars={total_bars}, bars_before={bars_before}, bars_after={bars_after}"
# )
# logger.info(f"df_range_len: {len(df_range)}")
poc = POC(df_range)
poc_line_name = "POC"
if poc_line_name in self.horizontal_lines:
self.horizontal_lines[poc_line_name].delete()
# 添加新的 POC 水平线
self.visible_latest_close = df_range["close"].iloc[-1]
profit = (self.visible_latest_close - poc) / poc * 100
poc_line = chart.horizontal_line(
price=poc, color="#FF0000", width=4, style="solid", text=f"{poc:.2f}"
)
# chart.legend(
# visible=True,
# text=f"POC: {poc:.2f}, poc_range_profit%: {profit:.1f}%, visible_tf_cnt: {len(df_range)}",
# )
legend_text = f"POC: {poc:.2f}, poc_range_profit%: {profit:.1f}%, visible_tf_cnt: {len(df_range)}"
self.update_legend(chart=chart, key=poc_line_name, text=legend_text)
self.horizontal_lines[poc_line_name] = poc_line
def check_df(self, df: pd.DataFrame):
# basic type check
if not isinstance(df, pd.DataFrame):
raise TypeError("df must be a pandas DataFrame")
required_cols = ["time", "open", "high", "low", "close", "volume"]
missing = [c for c in required_cols if c not in df.columns]
if missing:
raise ValueError(
f"Missing required columns: {missing}. Required: {required_cols}"
)
# time column must be datetime.datetime or pd.Timestamp (or a datetime64 dtype)
time_series = df["time"]
if not (
pd.api.types.is_datetime64_any_dtype(time_series)
or time_series.apply(
lambda x: isinstance(x, (pd.Timestamp, datetime))
).all()
):
raise TypeError(
"Column 'time' must contain datetime values (python datetime.datetime or pandas.Timestamp) "
"or be a datetime64 dtype."
)
# other columns must be numeric (int or float)
for col in ["open", "high", "low", "close", "volume"]:
if not pd.api.types.is_numeric_dtype(df[col]):
raise TypeError(f"Column '{col}' must be numeric (int or float)")
def setup_crosshair_tracking(self, chart):
chart.run_script(
f"""
{chart.id}.chart.subscribeCrosshairMove((param) => {{
if (!param.point) return;
const price = {chart.id}.series.coordinateToPrice(param.point.y);
if (price !== null) {{
window.callbackFunction(`crosshair_price_~_${{price}}`);
}}
}})
"""
)
# other columns must be numeric (int or float)
for col in ["open", "high", "low", "close", "volume"]:
if not pd.api.types.is_numeric_dtype(df[col]):
raise TypeError(f"Column '{col}' must be numeric (int or float)")
# 注册回调处理
def on_crosshair_price(price_str):
# 实时获取鼠标位置y轴的价格
price = float(price_str)
if self.visible_latest_close is not None:
profit = (self.visible_latest_close - price) / price * 100
self.update_legend(
chart=chart,
key="Crosshair Price Profit%",
text=f"Crosshair Price Profit%: {profit:.2f}%",
)
chart.win.handlers["crosshair_price"] = lambda p: on_crosshair_price(p)
def plot_chart(
df, symbol: str, name: str, timeframe: str, init_visible_num_bars: int = 90
):
# 校验数据是否满足
check_df(df)
def plot_chart(
self,
df,
symbol: str,
name: str,
timeframe: str,
init_visible_num_bars: int = 90,
):
# 校验数据是否满足
self.check_df(df)
chart = Chart(toolbox=True, inner_height=0.8, maximize=True)
chart.topbar.textbox("symbol", symbol)
chart.topbar.textbox("name", name)
chart.topbar.textbox("timeframe", timeframe)
chart.legend(
visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman"
)
chart.set(df)
chart = Chart(toolbox=True, inner_height=0.8, maximize=True)
chart.topbar.textbox("symbol", symbol)
chart.topbar.textbox("name", name)
chart.topbar.textbox("timeframe", timeframe)
chart.legend(
visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman"
)
chart.set(df)
# 设置刚进入chart时的可见k线数量范围
end_time = df["time"].iloc[-1]
start_time = df["time"].iloc[-init_visible_num_bars]
chart.set_visible_range(start_time, end_time)
# 设置刚进入chart时的可见k线数量范围
end_time = df["time"].iloc[-1]
start_time = df["time"].iloc[-init_visible_num_bars]
chart.set_visible_range(start_time, end_time)
# 设置每次放缩k线范围时的回调函数计算实时计算poc
chart.events.range_change += on_range_change_poc
# 设置每次放缩k线范围时的回调函数计算实时计算poc
chart.events.range_change += self.on_range_change_poc
# 添加技术指标
# add_ema(df, chart, period=10)
# add_ema(df, chart, period=20)
add_ema(df, chart, period=30)
# add_ema(df, chart, period=60)
add_macd(df, chart)
add_cci(df, chart, period=14)
add_TD(df, chart)
add_buy_sell_signal_markers(df, chart)
self.setup_crosshair_tracking(chart)
chart.show(block=True)
# 添加技术指标
# add_ema(df, chart, period=10)
# add_ema(df, chart, period=20)
self.add_ema(df, chart, period=30)
# add_ema(df, chart, period=60)
self.add_macd(df, chart)
self.add_cci(df, chart, period=14)
self.add_TD(df, chart)
self.add_buy_sell_signal_markers(df, chart)
chart.show(block=True)
if __name__ == "__main__":