dri*_*ver 7 python algorithm numpy matplotlib pandas
我试图获得一个在一系列点中较高的点,即枢轴高点,然后在一系列枢轴高点中我想找到一个显着的枢轴高点。为此,我试图创建一个不是预先定义的但每次都会计算的范围。它通过拐点图进行计算,以确定给出高于范围的点和低于范围的点的最佳参数。
这对于大量数据来说效果很好。如果循环无法找到最佳参数,我将手动分配最佳高数据和最佳低数据。我们还可以检查参数值有一个范围,并且较低的参数有一个条件,即它不能超过某个值。
这是足够的背景知识并确保代码被很好地理解。
现在我想添加一个功能,将趋势线绘制到包含重要枢轴高点、重要枢轴低点和收盘价的图中。趋势线的特征应该是这样,我能够将重要的枢轴低点与价格图表上的上升趋势线连接起来。该线触及的枢轴低点越重要,趋势线就越强。下降趋势线和重要枢轴低点的情况也是如此。
红色虚线和绿色虚线分别代表当前正在绘制的线。黑色和蓝色的连接线是我希望从代码中得到的东西。
我认为,我无法正确思考逻辑,一旦清除,我就可以清楚地编写算法。
代码:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import argrelextrema
def calculate_pivot_points(data):
pivot_points = []
resistance_levels = []
support_levels = []
pivot_high_points = []
pivot_low_points = []
for i in range(len(data)):
high = data.loc[i, 'high']
low = data.loc[i, 'low']
close = data.loc[i, 'close']
# Calculate Pivot Point
pivot_point = (high + low + close) / 3
pivot_points.append(pivot_point)
# Calculate Resistance Levels
resistance1 = (2 * pivot_point) - low
resistance2 = pivot_point + (high - low)
resistance3 = high + 2 * (pivot_point - low)
resistance_levels.append({'R1': resistance1, 'R2': resistance2, 'R3': resistance3})
# Calculate Support Levels
support1 = (2 * pivot_point) - high
support2 = pivot_point - (high - low)
support3 = low - 2 * (high - pivot_point)
support_levels.append({'S1': support1, 'S2': support2, 'S3': support3})
# Identify Pivot High Points using swing points
if i > 0 and i < len(data) - 1:
if high > data.loc[i-1, 'high'] and high > data.loc[i+1, 'high']:
pivot_high_points.append({'index': i, 'value': high})
# Identify Pivot Low Points using swing points
if i > 0 and i < len(data) - 1:
if low < data.loc[i-1, 'low'] and low < data.loc[i+1, 'low']:
pivot_low_points.append({'index': i, 'value': low})
return pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points
# Create a list to store all the data frames
data_frames = []
# Specify the folder path containing the CSV files
folder_path = "./data_frames"
# Iterate over each file in the folder
for filename in os.listdir(folder_path):
if filename.endswith(".csv"):
file_path = os.path.join(folder_path, filename)
# Read the data from the CSV file
data = pd.read_csv(file_path)
# Add the data frame to the list
data_frames.append(data)
# Extract the file name without the extension
file_name = os.path.splitext(filename)[0]
# Calculate pivot points and other parameters
pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points = calculate_pivot_points(data)
# Extract closing prices
closing_prices = data['close']
# Define the range of parameter values to test
parameter_range = range(1, 40)
# Calculate scores for different parameter combinations
parameter_scores = []
for high_parameter in parameter_range:
for low_parameter in parameter_range:
if low_parameter <= 8: # Add the condition here
# Determine significant pivot high points using swing points
significant_high_points = []
for point in pivot_high_points:
if point['index'] > 0 and point['index'] < len(data) - 1:
high_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'high']
if point['value'] == high_range.max():
significant_high_points.append(point)
# Determine significant pivot low points using swing points
significant_low_points = []
for point in pivot_low_points:
if point['index'] > 0 and point['index'] < len(data) - 1:
low_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'low']
if point['value'] == low_range.min():
significant_low_points.append(point)
# Calculate the score as the difference between high and low point counts
score = len(significant_high_points) - len(significant_low_points)
parameter_scores.append((high_parameter, low_parameter, score))
# Convert the scores to a NumPy array for easier manipulation
scores = np.array(parameter_scores)
# Find the optimal parameter values using the knee point
if len(scores) > 0:
knee_index = argrelextrema(scores[:, 2], np.less)[0][-1]
optimal_high_parameter, optimal_low_parameter, optimal_score = scores[knee_index]
else:
optimal_high_parameter = 16 # Manually assign the value
optimal_low_parameter = 2 # Manually assign the value
print("Optimal high parameter value:", optimal_high_parameter)
print("Optimal low parameter value:", optimal_low_parameter)
# Plot line chart for closing prices
plt.plot(closing_prices, label='Closing Prices')
# Calculate the trendlines for connecting the pivot high points
trendlines_high = []
trendline_points_high = []
for i in range(0, len(significant_high_points) - 1):
point1 = significant_high_points[i]
point2 = significant_high_points[i+1]
slope = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])
if slope > 0:
if not trendline_points_high:
trendline_points_high.append(point1)
trendline_points_high.append(point2)
else:
if len(trendline_points_high) > 1:
trendlines_high.append(trendline_points_high)
trendline_points_high = []
if len(trendline_points_high) > 1:
trendlines_high.append(trendline_points_high)
# Calculate the trendlines for connecting the pivot low points
trendlines_low = []
trendline_points_low = []
for i in range(0, len(significant_low_points) - 1):
point1 = significant_low_points[i]
point2 = significant_low_points[i+1]
slope = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])
if slope < 0:
if not trendline_points_low:
trendline_points_low.append(point1)
trendline_points_low.append(point2)
else:
if len(trendline_points_low) > 1:
trendlines_low.append(trendline_points_low)
trendline_points_low = []
if len(trendline_points_low) > 1:
trendlines_low.append(trendline_points_low)
# Plot the trendlines for positive slope
for trendline_points_high in trendlines_high:
x_values = [point['index'] for point in trendline_points_high]
y_values = [point['value'] for point in trendline_points_high]
plt.plot(x_values, y_values, color='red', linestyle='dashed')
# Plot the significant pivot high points
x_values = [point['index'] for point in significant_high_points]
y_values = [point['value'] for point in significant_high_points]
plt.scatter(x_values, y_values, color='red', label='Significant Pivot High Points')
# Plot the trendlines for positive slope
for trendline_points_low in trendlines_low:
x_values = [point['index'] for point in trendline_points_low]
y_values = [point['value'] for point in trendline_points_low]
plt.plot(x_values, y_values, color='green', linestyle='dashed')
# Plot the significant pivot low points
x_values = [point['index'] for point in significant_low_points]
y_values = [point['value'] for point in significant_low_points]
plt.scatter(x_values, y_values, color='green', label='Significant Pivot Low Points')
# Set chart title and labels
plt.title(f'Closing Prices with Trendlines and Significant Pivot Points ({file_name})')
plt.xlabel('Index')
plt.ylabel('Closing Price')
# Show the chart for the current data frame
plt.legend()
plt.show()
Run Code Online (Sandbox Code Playgroud)
如果您想自己尝试代码,可以在此驱动器链接中找到数据: 链接
PS:在当前代码中,我只是检查两个点是否位于同一条直线趋势线上。在很长一段时间内都不会出现这种情况。因此,我的想法是我们定义一个范围,如果首先第 n 点和第 n+1 点之间的斜率 > 或 < 0,那么我们继续处理接下来的两个点,即第 n+1 点和第 n+2 点。这里,如果两个斜率之间的差异,即n和n+1之间以及n+1和n+2之间的斜率在一定范围内,那么我们可以将主要斜率变量转移到n和n+2之间的斜率,并类似地运行循环。这将是一个很好的开始,但现在我被编码部分困住了。如果有人可以帮我编写代码,那将非常有帮助。
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import argrelextrema
def calculate_pivot_points(data):
pivot_points = []
resistance_levels = []
support_levels = []
pivot_high_points = []
pivot_low_points = []
for i in range(len(data)):
high = data.loc[i, 'high']
low = data.loc[i, 'low']
close = data.loc[i, 'close']
# Calculate Pivot Point
pivot_point = (high + low + close) / 3
pivot_points.append(pivot_point)
# Calculate Resistance Levels
resistance1 = (2 * pivot_point) - low
resistance2 = pivot_point + (high - low)
resistance3 = high + 2 * (pivot_point - low)
resistance_levels.append({'R1': resistance1, 'R2': resistance2, 'R3': resistance3})
# Calculate Support Levels
support1 = (2 * pivot_point) - high
support2 = pivot_point - (high - low)
support3 = low - 2 * (high - pivot_point)
support_levels.append({'S1': support1, 'S2': support2, 'S3': support3})
# Identify Pivot High Points using swing points
if i > 0 and i < len(data) - 1:
if high > data.loc[i-1, 'high'] and high > data.loc[i+1, 'high']:
pivot_high_points.append({'index': i, 'value': high})
# Identify Pivot Low Points using swing points
if i > 0 and i < len(data) - 1:
if low < data.loc[i-1, 'low'] and low < data.loc[i+1, 'low']:
pivot_low_points.append({'index': i, 'value': low})
return pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points
# Create a list to store all the data frames
data_frames = []
# Specify the folder path containing the CSV files
folder_path = "./data_frames"
# Iterate over each file in the folder
for filename in os.listdir(folder_path):
if filename.endswith(".csv"):
file_path = os.path.join(folder_path, filename)
# Read the data from the CSV file
data = pd.read_csv(file_path)
# Add the data frame to the list
data_frames.append(data)
# Extract the file name without the extension
file_name = os.path.splitext(filename)[0]
# Calculate pivot points and other parameters
pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points = calculate_pivot_points(data)
# Extract closing prices
closing_prices = data['close']
# Define the range of parameter values to test
parameter_range = range(1, 40)
# Calculate scores for different parameter combinations
parameter_scores = []
for high_parameter in parameter_range:
for low_parameter in parameter_range:
if low_parameter <= 8: # Add the condition here
# Determine significant pivot high points using swing points
significant_high_points = []
for point in pivot_high_points:
if point['index'] > 0 and point['index'] < len(data) - 1:
high_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'high']
if point['value'] == high_range.max():
significant_high_points.append(point)
# Determine significant pivot low points using swing points
significant_low_points = []
for point in pivot_low_points:
if point['index'] > 0 and point['index'] < len(data) - 1:
low_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'low']
if point['value'] == low_range.min():
significant_low_points.append(point)
# Calculate the score as the difference between high and low point counts
score = len(significant_high_points) - len(significant_low_points)
parameter_scores.append((high_parameter, low_parameter, score))
# Convert the scores to a NumPy array for easier manipulation
scores = np.array(parameter_scores)
# Find the optimal parameter values using the knee point
if len(scores) > 0:
knee_index = argrelextrema(scores[:, 2], np.less)[0][-1]
optimal_high_parameter, optimal_low_parameter, optimal_score = scores[knee_index]
else:
optimal_high_parameter = 16 # Manually assign the value
optimal_low_parameter = 2 # Manually assign the value
print("Optimal high parameter value:", optimal_high_parameter)
print("Optimal low parameter value:", optimal_low_parameter)
# Plot line chart for closing prices
plt.plot(closing_prices, label='Closing Prices')
slope_range = 1 # Adjust this range as per your requirement
# Calculate the trendlines for connecting the pivot high points
trendlines_high = []
trendline_points_high = []
for i in range(0, len(significant_high_points) - 2):
point1 = significant_high_points[i]
point2 = significant_high_points[i+1]
slope1 = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])
point3 = significant_high_points[i+1]
point4 = significant_high_points[i+2]
slope2 = (point4['value'] - point3['value']) / (point4['index'] - point3['index'])
slope_difference = abs(slope2 - slope1)
if slope1 < 0:
if not trendline_points_high:
trendline_points_high.append(point1)
if slope_difference <= slope_range:
trendline_points_high.append(point2)
else:
if len(trendline_points_high) > 1:
trendlines_high.append(trendline_points_high)
trendline_points_high = [point2] # Start a new trendline with point2
if len(trendline_points_high) > 1:
trendlines_high.append(trendline_points_high)
# Calculate the trendlines for connecting the pivot low points
trendlines_low = []
trendline_points_low = []
for i in range(0, len(significant_low_points) - 2):
point1 = significant_low_points[i]
point2 = significant_low_points[i+1]
slope1 = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])
point3 = significant_low_points[i+1]
point4 = significant_low_points[i+2]
slope2 = (point4['value'] - point3['value']) / (point4['index'] - point3['index'])
slope_difference = abs(slope2 - slope1)
if slope1 > 0:
if not trendline_points_low:
trendline_points_low.append(point1)
if slope_difference <= slope_range:
trendline_points_low.append(point2)
else:
if len(trendline_points_low) > 1:
trendlines_low.append(trendline_points_low)
trendline_points_low = [point2] # Start a new trendline with point2
if len(trendline_points_low) > 1:
trendlines_low.append(trendline_points_low)
# Plot the trendlines for positive slope
for trendline_points_high in trendlines_high:
x_values = [point['index'] for point in trendline_points_high]
y_values = [point['value'] for point in trendline_points_high]
plt.plot(x_values, y_values, color='red', linestyle='dashed')
# Plot the significant pivot high points
x_values = [point['index'] for point in significant_high_points]
y_values = [point['value'] for point in significant_high_points]
plt.scatter(x_values, y_values, color='red', label='Significant Pivot High Points')
# Plot the trendlines for positive slope
for trendline_points_low in trendlines_low:
x_values = [point['index'] for point in trendline_points_low]
y_values = [point['value'] for point in trendline_points_low]
plt.plot(x_values, y_values, color='green', linestyle='dashed')
# Plot the significant pivot low points
x_values = [point['index'] for point in significant_low_points]
y_values = [point['value'] for point in significant_low_points]
plt.scatter(x_values, y_values, color='green', label='Significant Pivot Low Points')
# Set chart title and labels
plt.title(f'Closing Prices with Trendlines and Significant Pivot Points ({file_name})')
plt.xlabel('Index')
plt.ylabel('Closing Price')
# Show the chart for the current data frame
plt.legend()
plt.show()
Run Code Online (Sandbox Code Playgroud)
根据我刚才所说的逻辑,这是我的新方法,但绘图仍然离我们想要的不远。
簇的数量是硬编码的(并且可以通过变量轻松更改n_clusters);在更复杂的版本中,将得出基于数据本身的最佳集群数量(例如,这就是mean_squared_error包含在内的原因,但我最终没有在本演示中使用这些集群;使用该指标总是选择可能的最大集群数量作为最好的 - 会有一个更好的方法来找到理想的簇数量,但是,通过一小轮的试错运行手动参数化它并不太困难或耗时)。
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from sklearn.cluster import KMeans
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
def fit_line_to_cluster(df, cluster_label):
reg = LinearRegression().fit(df[["index"]], df["value"])
pred = reg.predict(df[["index"]])
mse = mean_squared_error(df["value"], pred)
return reg, mse
def calculate_pivot_points(data):
pivot_points = []
pivot_high_points = []
pivot_low_points = []
for i in range(len(data)):
high = data.loc[i, "high"]
low = data.loc[i, "low"]
close = data.loc[i, "close"]
# Calculate Pivot Point
pivot_point = (high + low + close) / 3
pivot_points.append(pivot_point)
# Identify Pivot High Points using swing points
if i > 0 and i < len(data) - 1:
if (
high > data.loc[i - 1, "high"]
and high > data.loc[i + 1, "high"]
):
pivot_high_points.append({"index": i, "value": high})
# Identify Pivot Low Points using swing points
if i > 0 and i < len(data) - 1:
if low < data.loc[i - 1, "low"] and low < data.loc[i + 1, "low"]:
pivot_low_points.append({"index": i, "value": low})
return (
pivot_points,
pivot_high_points,
pivot_low_points,
)
def add_fitted_lines_to_plotly(fig, df, models, colors):
for i, model in enumerate(models):
x_vals = df[df["cluster"] == i]["index"].values
y_vals = model.predict(x_vals.reshape(-1, 1))
fig.add_trace(
go.Scatter(
x=x_vals, y=y_vals, mode="lines", line=dict(color=colors[i]),
)
)
### Data Analysis
df = pd.read_excel("~/Downloads/data3.xlsx")
df["time"] = pd.to_datetime(df["timestamp"])
pivot_points, pivot_high_points, pivot_low_points = calculate_pivot_points(df)
high_df = pd.DataFrame(pivot_high_points)
low_df = pd.DataFrame(pivot_low_points)
## Clustering
n_clusters = 20
optimal_high_models = []
optimal_low_models = []
# For high points
kmeans_high = KMeans(n_clusters=n_clusters, random_state=0).fit(
high_df[["index", "value"]]
)
high_df["cluster"] = kmeans_high.labels_
for i in range(n_clusters):
cluster_data = high_df[high_df["cluster"] == i]
model, mse = fit_line_to_cluster(cluster_data, i)
optimal_high_models.append(model)
# For low points
kmeans_low = KMeans(n_clusters=n_clusters, random_state=0).fit(
low_df[["index", "value"]]
)
low_df["cluster"] = kmeans_low.labels_
for i in range(n_clusters):
cluster_data = low_df[low_df["cluster"] == i]
model, mse = fit_line_to_cluster(cluster_data, i)
optimal_low_models.append(model)
closing_prices = df["close"].values
### Plotting
fig = go.Figure()
# Plot closing, high, and low points
fig.add_trace(
go.Scatter(
x=list(range(len(closing_prices))),
y=closing_prices,
mode="lines",
name="Closing Prices",
line=dict(color="blue"),
opacity=0.75,
)
)
fig.add_trace(
go.Scatter(
x=high_df["index"],
y=high_df["value"],
mode="markers",
name="High Points",
marker=dict(color="red"),
opacity=0.75,
marker_size=5,
)
)
fig.add_trace(
go.Scatter(
x=low_df["index"],
y=low_df["value"],
mode="markers",
name="Low Points",
marker=dict(color="green"),
opacity=0.75,
marker_size=5,
)
)
# Add optimal trendlines for high and low clusters
add_fitted_lines_to_plotly(
fig, high_df, optimal_high_models, ["magenta"] * n_clusters
)
add_fitted_lines_to_plotly(
fig, low_df, optimal_low_models, ["cyan"] * n_clusters
)
# Set chart title and labels
fig.update_layout(
title="High and Low Pivot Points with Clustered Trendlines",
xaxis_title="Time",
yaxis_title="Price",
)
fig.show()
Run Code Online (Sandbox Code Playgroud)