Compare commits

..

10 Commits

4 changed files with 333 additions and 268 deletions

View File

@@ -27,6 +27,7 @@ RUN uv pip install --system -r requirements.txt
COPY . .
RUN rm -rf ./data
# 暴露端口
EXPOSE 80

488
chart.py
View File

@@ -3,121 +3,12 @@ import pandas as pd
from loguru import logger
import talib as ta
import numpy as np
from lightweight_charts import Chart
from lightweight_charts import Chart # lightweight-charts==2.1
import random
from datetime import datetime
horizontal_lines = {}
def get_fixed_color_based_on_period(period):
# 使用周期值作为随机种子,确保相同周期生成相同颜色
random.seed(period)
# 生成随机的RGB值
r = random.randint(0, 255)
g = random.randint(0, 255)
b = random.randint(0, 255)
# 将RGB值转换为十六进制颜色代码
color_code = "#{:02x}{:02x}{:02x}".format(r, g, b)
# 重置随机种子,避免影响其他随机操作
random.seed(None)
return color_code
def add_ema(df, chart, period: int = 50):
name = f"EMA_{period}"
df[name] = ta.EMA(df["close"], timeperiod=period)
color = get_fixed_color_based_on_period(period)
line = chart.create_line(
name, color=color, width=2, price_label=False, price_line=False
)
line.set(df[["time", name]])
def add_cci(df, chart, period: int = 14, height: float = 0.1, position: str = "bottom"):
cci = ta.CCI(df["high"], df["low"], df["close"], timeperiod=period)
df[f"CCI_{period}"] = cci
cci_chart = chart.create_subchart(
position=position, width=1, height=height, sync=True
)
cci_chart.legend(
visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman"
)
cci_chart.time_scale(visible=False)
cci_line = cci_chart.create_line(name=f"CCI_{period}", color="#FF0000", width=2)
cci_line.set(df[["time", f"CCI_{period}"]])
df = df[["time"]].copy()
df["h"] = 100
df["l"] = -100
cci_line = cci_chart.create_line(
name="h",
color="#D4C21C",
width=1,
style="dashed",
price_label=False,
price_line=False,
)
cci_line.set(df[["time", "h"]])
cci_line = cci_chart.create_line(
name="l",
color="#D4C21C",
width=1,
style="dashed",
price_label=False,
price_line=False,
)
cci_line.set(df[["time", "l"]])
def add_macd(
df,
chart,
fastperiod: int = 12,
slowperiod: int = 26,
signalperiod: int = 9,
height: float = 0.1,
position: str = "bottom",
):
macd, signal, hist = ta.MACD(
df["close"],
fastperiod=fastperiod,
slowperiod=slowperiod,
signalperiod=signalperiod,
)
df["DIF"] = macd
df["DEA"] = signal
macd_name = f"MACD_{fastperiod}_{slowperiod}_{signalperiod}"
df[macd_name] = hist * 2
macd_chart = chart.create_subchart(
position=position, width=1, height=height, sync=True
)
macd_chart.legend(
visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman"
)
macd_chart.time_scale(visible=False)
histogram = macd_chart.create_histogram(name=macd_name)
hist_data = df[["time", macd_name]].copy()
hist_data["color"] = hist_data[macd_name].apply(
lambda x: "#00FF00" if x < 0 else "#ff0000" # 绿色 : 红色
)
histogram.set(hist_data)
macd_line = macd_chart.create_line(
name="DIF", color="#2962FF", width=2, price_label=False, price_line=False
)
macd_line.set(df[["time", "DIF"]])
signal_line = macd_chart.create_line(
name="DEA", color="#FF0000", width=2, price_label=False, price_line=False
)
signal_line.set(df[["time", "DEA"]])
def TD(dataframe: pd.DataFrame):
close = dataframe["close"].to_list()
@@ -136,77 +27,6 @@ def TD(dataframe: pd.DataFrame):
return td
def add_TD(df, chart):
df["TD"] = TD(df)
td_line = chart.create_line(
name="TD",
color="rgba(0, 0, 0, 0)", # 透明色,不在图表上显示线条
width=0,
price_line=False,
price_label=False,
price_scale_id="td_scale",
)
td_line.precision(0)
td_line.set(df[["time", "TD"]])
TDs = df[["time", "TD"]].to_dict(orient="records")
markers = []
for item in TDs:
if item["TD"] in [9, 13]:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "above",
"shape": "arrow_down",
"color": "#00FF00",
"text": f"{item['TD']}",
}
)
elif item["TD"] in [-9, -13]:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "below",
"shape": "arrow_up",
"color": "#FF0000",
"text": f"{item['TD']}",
}
)
chart.marker_list(markers)
def add_buy_sell_signal_markers(df, chart):
if "buy" not in df.columns or "sell" not in df.columns:
return
markers = []
signals = df[["time", "buy", "sell"]].to_dict(orient="records")
for item in signals:
if item["buy"] == 1:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "below",
"shape": "arrow_up",
"color": "#00FF00",
"text": f"B",
}
)
elif item["sell"] == 1:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "above",
"shape": "arrow_down",
"color": "#FF0000",
"text": f"S",
}
)
chart.marker_list(markers)
def resample_data(df: pd.DataFrame, timeframe: str) -> pd.DataFrame:
"""
对日线数据进行重采样
@@ -296,19 +116,230 @@ def POC(df, bins: int = 50):
return poc
def on_range_change_poc(chart, bars_before, bars_after):
def get_fixed_color_based_on_period(num: int):
# 使用周期值作为随机种子,确保相同周期生成相同颜色
random.seed(num)
# 生成随机的RGB值
r = random.randint(0, 255)
g = random.randint(0, 255)
b = random.randint(0, 255)
# 将RGB值转换为十六进制颜色代码
color_code = "#{:02x}{:02x}{:02x}".format(r, g, b)
# 重置随机种子,避免影响其他随机操作
random.seed(None)
return color_code
class QuantChart:
def __init__(self):
self.horizontal_lines = {}
self.legend_data = {}
self.visible_latest_close = None
def update_legend(self, chart, key, text):
self.legend_data[key] = text
sorted_dict = dict(sorted(self.legend_data.items()))
full_text = ", ".join(sorted_dict.values())
chart.legend(visible=True, text=full_text)
def add_ema(self, df, chart, period: int = 50):
name = f"EMA_{period}"
df[name] = ta.EMA(df["close"], timeperiod=period)
color = get_fixed_color_based_on_period(num=period)
line = chart.create_line(
name, color=color, width=2, price_label=False, price_line=False
)
line.set(df[["time", name]])
def add_cci(
self, df, chart, period: int = 14, height: float = 0.1, position: str = "bottom"
):
cci = ta.CCI(df["high"], df["low"], df["close"], timeperiod=period)
df[f"CCI_{period}"] = cci
cci_chart = chart.create_subchart(
position=position, width=1, height=height, sync=True
)
cci_chart.layout(font_family="Times New Roman")
cci_chart.legend(
visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman"
)
cci_chart.time_scale(visible=False)
cci_line = cci_chart.create_line(name=f"CCI_{period}", color="#FF0000", width=2)
cci_line.set(df[["time", f"CCI_{period}"]])
df = df[["time"]].copy()
df["h"] = 100
df["l"] = -100
cci_line = cci_chart.create_line(
name="h",
color="#D4C21C",
width=1,
style="dashed",
price_label=False,
price_line=False,
)
cci_line.set(df[["time", "h"]])
cci_line = cci_chart.create_line(
name="l",
color="#D4C21C",
width=1,
style="dashed",
price_label=False,
price_line=False,
)
cci_line.set(df[["time", "l"]])
def add_macd(
self,
df,
chart,
fastperiod: int = 12,
slowperiod: int = 26,
signalperiod: int = 9,
height: float = 0.1,
position: str = "bottom",
):
macd, signal, hist = ta.MACD(
df["close"],
fastperiod=fastperiod,
slowperiod=slowperiod,
signalperiod=signalperiod,
)
df["DIF"] = macd
df["DEA"] = signal
macd_name = f"MACD_{fastperiod}_{slowperiod}_{signalperiod}"
df[macd_name] = hist * 2
macd_chart = chart.create_subchart(
position=position, width=1, height=height, sync=True
)
macd_chart.layout(font_family="Times New Roman")
macd_chart.legend(
visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman"
)
macd_chart.time_scale(visible=False)
histogram = macd_chart.create_histogram(name=macd_name)
hist_data = df[["time", macd_name]].copy()
hist_data["prev_value"] = hist_data[macd_name].shift(1)
# 设置颜色逻辑
def get_histogram_color(row):
current, prev = row[macd_name], row["prev_value"]
is_hollow = (current >= 0 and current < prev) or (
current < 0 and current > prev
)
if current >= 0:
return "rgba(255, 0, 0, 0.5)" if is_hollow else "#ff0000"
else:
return "rgba(0, 255, 0, 0.5)" if is_hollow else "#00FF00"
hist_data["color"] = hist_data.apply(get_histogram_color, axis=1)
hist_data = hist_data.drop("prev_value", axis=1)
histogram.set(hist_data)
macd_line = macd_chart.create_line(
name="DIF", color="#2962FF", width=2, price_label=False, price_line=False
)
macd_line.set(df[["time", "DIF"]])
signal_line = macd_chart.create_line(
name="DEA", color="#FF0000", width=2, price_label=False, price_line=False
)
signal_line.set(df[["time", "DEA"]])
def add_TD(self, df, chart):
df["TD"] = TD(df)
td_line = chart.create_line(
name="TD",
color="rgba(0, 0, 0, 0)", # 透明色,不在图表上显示线条
width=0,
price_line=False,
price_label=False,
price_scale_id="td_scale",
)
td_line.precision(0)
td_line.set(df[["time", "TD"]])
TDs = df[["time", "TD"]].to_dict(orient="records")
markers = []
for item in TDs:
if item["TD"] in [9, 13]:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d %H:%M:%S"),
"position": "above",
"shape": "arrow_down",
"color": "#00FF00",
"text": f"{item['TD']}",
}
)
elif item["TD"] in [-9, -13]:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d %H:%M:%S"),
"position": "below",
"shape": "arrow_up",
"color": "#FF0000",
"text": f"{item['TD']}",
}
)
chart.marker_list(markers)
def add_buy_sell_signal_markers(self, df, chart):
if "buy" not in df.columns or "sell" not in df.columns:
return
markers = []
signals = df[["time", "buy", "sell"]].to_dict(orient="records")
for item in signals:
if item["buy"] == 1:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "below",
"shape": "arrow_up",
"color": "#00FF00",
"text": f"B",
}
)
elif item["sell"] == 1:
markers.append(
{
"time": item["time"].strftime("%Y-%m-%d"),
"position": "above",
"shape": "arrow_down",
"color": "#FF0000",
"text": f"S",
}
)
chart.marker_list(markers)
def on_range_change_poc(self, chart, bars_before, bars_after):
df = chart.candle_data
if df is None or df.empty:
return
total_bars = len(df)
# TODO: k线拉到最早会报错
if bars_after < 0:
# 计算可见范围的起始和结束索引
# if bars_after < 0:
# start_idx = max(0, int(bars_before))
# end_idx = total_bars
# elif bars_before < 0:
# start_idx = 0
# end_idx = max(0, int(total_bars - bars_after))
# else:
# start_idx = int(bars_before)
# end_idx = max(0, int(total_bars - bars_after))
start_idx = max(0, int(bars_before))
end_idx = total_bars
else:
start_idx = int(bars_before)
end_idx = max(0, int(total_bars - bars_after))
end_idx = max(0, int(total_bars - max(0, int(bars_after))))
df_range = df.iloc[start_idx:end_idx]
# logger.info(
# f"Calculating POC for bars {start_idx} to {end_idx}, total_bars={total_bars}, bars_before={bars_before}, bars_after={bars_after}"
@@ -316,23 +347,20 @@ def on_range_change_poc(chart, bars_before, bars_after):
# logger.info(f"df_range_len: {len(df_range)}")
poc = POC(df_range)
poc_line_name = "POC"
if poc_line_name in horizontal_lines:
horizontal_lines[poc_line_name].delete()
if poc_line_name in self.horizontal_lines:
self.horizontal_lines[poc_line_name].delete()
# 添加新的 POC 水平线
latest_close = df_range["close"].iloc[-1]
profit = (latest_close - poc) / poc * 100
self.visible_latest_close = df_range["close"].iloc[-1]
profit = (self.visible_latest_close - poc) / poc * 100
poc_line = chart.horizontal_line(
price=poc, color="#FF0000", width=4, style="solid", text=f"{poc:.2f}"
)
chart.legend(
visible=True,
text=f"POC: {poc:.2f}, poc_range_profit%: {profit:.1f}%, tf_cnt: {len(df_range)}",
)
legend_text = f"POC: {poc:.2f}, poc_range_profit%: {profit:.1f}%, visible_tf_cnt: {len(df_range)}"
self.update_legend(chart=chart, key=poc_line_name, text=legend_text)
horizontal_lines[poc_line_name] = poc_line
self.horizontal_lines[poc_line_name] = poc_line
def check_df(df: pd.DataFrame):
def check_df(self, df: pd.DataFrame):
# basic type check
if not isinstance(df, pd.DataFrame):
raise TypeError("df must be a pandas DataFrame")
@@ -348,7 +376,9 @@ def check_df(df: pd.DataFrame):
time_series = df["time"]
if not (
pd.api.types.is_datetime64_any_dtype(time_series)
or time_series.apply(lambda x: isinstance(x, (pd.Timestamp, datetime))).all()
or time_series.apply(
lambda x: isinstance(x, (pd.Timestamp, datetime))
).all()
):
raise TypeError(
"Column 'time' must contain datetime values (python datetime.datetime or pandas.Timestamp) "
@@ -360,20 +390,50 @@ def check_df(df: pd.DataFrame):
if not pd.api.types.is_numeric_dtype(df[col]):
raise TypeError(f"Column '{col}' must be numeric (int or float)")
def setup_crosshair_tracking(self, chart):
chart.run_script(
f"""
{chart.id}.chart.subscribeCrosshairMove((param) => {{
if (!param.point) return;
const price = {chart.id}.series.coordinateToPrice(param.point.y);
if (price !== null) {{
window.callbackFunction(`crosshair_price_~_${{price}}`);
}}
}})
"""
)
def plot_chart(
df, symbol: str, name: str, timeframe: str, init_visible_num_bars: int = 90
):
# 注册回调处理
def on_crosshair_price(price_str):
# 实时获取鼠标位置y轴的价格
price = float(price_str)
if self.visible_latest_close is not None:
profit = (self.visible_latest_close - price) / price * 100
self.update_legend(
chart=chart,
key="Crosshair Price Profit%",
text=f"crosshair_price_profit%: {profit:.2f}%",
)
chart.win.handlers["crosshair_price"] = lambda p: on_crosshair_price(p)
def plot_chart(
self,
df,
symbol: str,
name: str,
timeframe: str,
init_visible_num_bars: int = 90,
):
# 校验数据是否满足
check_df(df)
self.check_df(df)
chart = Chart(toolbox=True, inner_height=0.8, maximize=True)
chart.layout(font_family="Times New Roman")
chart.topbar.textbox("symbol", symbol)
chart.topbar.textbox("name", name)
chart.topbar.textbox("timeframe", timeframe)
chart.legend(
visible=True, font_size=14, color="#FFFFFF", font_family="Times New Roman"
)
chart.legend(visible=True, font_size=14, color="#FFFFFF")
chart.set(df)
# 设置刚进入chart时的可见k线数量范围
@@ -382,17 +442,19 @@ def plot_chart(
chart.set_visible_range(start_time, end_time)
# 设置每次放缩k线范围时的回调函数计算实时计算poc
chart.events.range_change += on_range_change_poc
chart.events.range_change += self.on_range_change_poc
self.setup_crosshair_tracking(chart)
# 添加技术指标
# add_ema(df, chart, period=10)
# add_ema(df, chart, period=20)
add_ema(df, chart, period=30)
self.add_ema(df, chart, period=30)
# add_ema(df, chart, period=60)
add_macd(df, chart)
add_cci(df, chart, period=14)
add_TD(df, chart)
add_buy_sell_signal_markers(df, chart)
self.add_macd(df, chart)
self.add_cci(df, chart, period=14)
self.add_TD(df, chart)
self.add_buy_sell_signal_markers(df, chart)
chart.show(block=True)

View File

@@ -1,5 +1,5 @@
import pandas as pd
from chart import plot_chart
from chart import QuantChart
if __name__ == "__main__":
symbol = "ETH_USDT"
@@ -9,4 +9,5 @@ if __name__ == "__main__":
df.rename(columns={"date": "time"}, inplace=True)
print(df.head())
plot_chart(df, symbol=symbol, name=symbol, timeframe=timeframe)
quant_chart = QuantChart()
quant_chart.plot_chart(df, symbol=symbol, name=symbol, timeframe=timeframe, init_visible_num_bars=180)

View File

@@ -1,6 +1,6 @@
import pandas as pd
from loguru import logger
from chart import plot_chart, resample_data
from chart import resample_data, QuantChart
from db_config import DatabaseManager, DatabaseConfig
@@ -28,7 +28,7 @@ def get_kline(code: str) -> list:
if __name__ == "__main__":
symbol = "399998"
symbol = "399986"
timeframe = "1D"
df = pd.read_csv(
@@ -41,4 +41,5 @@ if __name__ == "__main__":
df = resample_data(df, timeframe)
# df['buy'] = df['time'].apply(lambda x: 1 if pd.to_datetime(x).day % 2 == 1 else 0)
# df['sell'] = df['time'].apply(lambda x: 1 if pd.to_datetime(x).day % 2 == 0 else 0)
plot_chart(df=df, symbol=symbol, name=name, timeframe=timeframe)
quant_chart = QuantChart()
quant_chart.plot_chart(df=df, symbol=symbol, name=name, timeframe=timeframe)