在确定重要枢轴点后,在特定条件下绘制趋势线

dri*_*ver 7 python algorithm numpy matplotlib pandas

我试图获得一个在一系列点中较高的点,即枢轴高点,然后在一系列枢轴高点中我想找到一个显着的枢轴高点。为此,我试图创建一个不是预先定义的但每次都会计算的范围。它通过拐点图进行计算,以确定给出高于范围的点和低于范围的点的最佳参数。

这对于大量数据来说效果很好。如果循环无法找到最佳参数,我将手动分配最佳高数据和最佳低数据。我们还可以检查参数值有一个范围,并且较低的参数有一个条件,即它不能超过某个值。

这是足够的背景知识并确保代码被很好地理解。

现在我想添加一个功能,将趋势线绘制到包含重要枢轴高点、重要枢轴低点和收盘价的图中。趋势线的特征应该是这样,我能够将重要的枢轴低点与价格图表上的上升趋势线连接起来。该线触及的枢轴低点越重要,趋势线就越强。下降趋势线和重要枢轴低点的情况也是如此。

我的代码当前绘制的内容类似于:在此输入图像描述

红色虚线和绿色虚线分别代表当前正在绘制的线。黑色和蓝色的连接线是我希望从代码中得到的东西。

我认为,我无法正确思考逻辑,一旦清除,我就可以清楚地编写算法。

代码:

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import argrelextrema

def calculate_pivot_points(data):
    pivot_points = []
    resistance_levels = []
    support_levels = []
    pivot_high_points = []
    pivot_low_points = []

    for i in range(len(data)):
        high = data.loc[i, 'high']
        low = data.loc[i, 'low']
        close = data.loc[i, 'close']

        # Calculate Pivot Point
        pivot_point = (high + low + close) / 3
        pivot_points.append(pivot_point)

        # Calculate Resistance Levels
        resistance1 = (2 * pivot_point) - low
        resistance2 = pivot_point + (high - low)
        resistance3 = high + 2 * (pivot_point - low)
        resistance_levels.append({'R1': resistance1, 'R2': resistance2, 'R3': resistance3})

        # Calculate Support Levels
        support1 = (2 * pivot_point) - high
        support2 = pivot_point - (high - low)
        support3 = low - 2 * (high - pivot_point)
        support_levels.append({'S1': support1, 'S2': support2, 'S3': support3})

        # Identify Pivot High Points using swing points
        if i > 0 and i < len(data) - 1:
            if high > data.loc[i-1, 'high'] and high > data.loc[i+1, 'high']:
                pivot_high_points.append({'index': i, 'value': high})

        # Identify Pivot Low Points using swing points
        if i > 0 and i < len(data) - 1:
            if low < data.loc[i-1, 'low'] and low < data.loc[i+1, 'low']:
                pivot_low_points.append({'index': i, 'value': low})

    return pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points

# Create a list to store all the data frames
data_frames = []

# Specify the folder path containing the CSV files
folder_path = "./data_frames"

# Iterate over each file in the folder
for filename in os.listdir(folder_path):
    if filename.endswith(".csv"):
        file_path = os.path.join(folder_path, filename)

        # Read the data from the CSV file
        data = pd.read_csv(file_path)

        # Add the data frame to the list
        data_frames.append(data)

        # Extract the file name without the extension
        file_name = os.path.splitext(filename)[0]

        # Calculate pivot points and other parameters
        pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points = calculate_pivot_points(data)

        # Extract closing prices
        closing_prices = data['close']

        # Define the range of parameter values to test
        parameter_range = range(1, 40)

        # Calculate scores for different parameter combinations
        parameter_scores = []
        for high_parameter in parameter_range:
            for low_parameter in parameter_range:
                if low_parameter <= 8:  # Add the condition here
                    # Determine significant pivot high points using swing points
                    significant_high_points = []
                    for point in pivot_high_points:
                        if point['index'] > 0 and point['index'] < len(data) - 1:
                            high_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'high']
                            if point['value'] == high_range.max():
                                significant_high_points.append(point)

                    # Determine significant pivot low points using swing points
                    significant_low_points = []
                    for point in pivot_low_points:
                        if point['index'] > 0 and point['index'] < len(data) - 1:
                            low_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'low']
                            if point['value'] == low_range.min():
                                significant_low_points.append(point)

                    # Calculate the score as the difference between high and low point counts
                    score = len(significant_high_points) - len(significant_low_points)
                    parameter_scores.append((high_parameter, low_parameter, score))

        # Convert the scores to a NumPy array for easier manipulation
        scores = np.array(parameter_scores)

        # Find the optimal parameter values using the knee point
        if len(scores) > 0:
            knee_index = argrelextrema(scores[:, 2], np.less)[0][-1]
            optimal_high_parameter, optimal_low_parameter, optimal_score = scores[knee_index]
        else:
            optimal_high_parameter = 16  # Manually assign the value
            optimal_low_parameter = 2  # Manually assign the value

        print("Optimal high parameter value:", optimal_high_parameter)
        print("Optimal low parameter value:", optimal_low_parameter)

        # Plot line chart for closing prices
        plt.plot(closing_prices, label='Closing Prices')

        # Calculate the trendlines for connecting the pivot high points
        trendlines_high = []
        trendline_points_high = []
        for i in range(0, len(significant_high_points) - 1):
            point1 = significant_high_points[i]
            point2 = significant_high_points[i+1]
            slope = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])
            if slope > 0:
                if not trendline_points_high:
                    trendline_points_high.append(point1)
                trendline_points_high.append(point2)
            else:
                if len(trendline_points_high) > 1:
                    trendlines_high.append(trendline_points_high)
                trendline_points_high = []
        if len(trendline_points_high) > 1:
            trendlines_high.append(trendline_points_high)

        # Calculate the trendlines for connecting the pivot low points
        trendlines_low = []
        trendline_points_low = []
        for i in range(0, len(significant_low_points) - 1):
            point1 = significant_low_points[i]
            point2 = significant_low_points[i+1]
            slope = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])
            if slope < 0:
                if not trendline_points_low:
                    trendline_points_low.append(point1)
                trendline_points_low.append(point2)
            else:
                if len(trendline_points_low) > 1:
                    trendlines_low.append(trendline_points_low)
                trendline_points_low = []
        if len(trendline_points_low) > 1:
            trendlines_low.append(trendline_points_low)

        # Plot the trendlines for positive slope
        for trendline_points_high in trendlines_high:
            x_values = [point['index'] for point in trendline_points_high]
            y_values = [point['value'] for point in trendline_points_high]
            plt.plot(x_values, y_values, color='red', linestyle='dashed')

        # Plot the significant pivot high points
        x_values = [point['index'] for point in significant_high_points]
        y_values = [point['value'] for point in significant_high_points]
        plt.scatter(x_values, y_values, color='red', label='Significant Pivot High Points')

        # Plot the trendlines for positive slope
        for trendline_points_low in trendlines_low:
            x_values = [point['index'] for point in trendline_points_low]
            y_values = [point['value'] for point in trendline_points_low]
            plt.plot(x_values, y_values, color='green', linestyle='dashed')

        # Plot the significant pivot low points
        x_values = [point['index'] for point in significant_low_points]
        y_values = [point['value'] for point in significant_low_points]
        plt.scatter(x_values, y_values, color='green', label='Significant Pivot Low Points')

        # Set chart title and labels
        plt.title(f'Closing Prices with Trendlines and Significant Pivot Points ({file_name})')
        plt.xlabel('Index')
        plt.ylabel('Closing Price')

        # Show the chart for the current data frame
        plt.legend()
        plt.show()
Run Code Online (Sandbox Code Playgroud)

如果您想自己尝试代码,可以在此驱动器链接中找到数据: 链接

PS:在当前代码中,我只是检查两个点是否位于同一条直线趋势线上。在很长一段时间内都不会出现这种情况。因此,我的想法是我们定义一个范围,如果首先第 n 点和第 n+1 点之间的斜率 > 或 < 0,那么我们继续处理接下来的两个点,即第 n+1 点和第 n+2 点。这里,如果两个斜率之间的差异,即n和n+1之间以及n+1和n+2之间的斜率在一定范围内,那么我们可以将主要斜率变量转移到n和n+2之间的斜率,并类似地运行循环。这将是一个很好的开始,但现在我被编码部分困住了。如果有人可以帮我编写代码,那将非常有帮助。

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import argrelextrema

def calculate_pivot_points(data):
    pivot_points = []
    resistance_levels = []
    support_levels = []
    pivot_high_points = []
    pivot_low_points = []

    for i in range(len(data)):
        high = data.loc[i, 'high']
        low = data.loc[i, 'low']
        close = data.loc[i, 'close']

        # Calculate Pivot Point
        pivot_point = (high + low + close) / 3
        pivot_points.append(pivot_point)

        # Calculate Resistance Levels
        resistance1 = (2 * pivot_point) - low
        resistance2 = pivot_point + (high - low)
        resistance3 = high + 2 * (pivot_point - low)
        resistance_levels.append({'R1': resistance1, 'R2': resistance2, 'R3': resistance3})

        # Calculate Support Levels
        support1 = (2 * pivot_point) - high
        support2 = pivot_point - (high - low)
        support3 = low - 2 * (high - pivot_point)
        support_levels.append({'S1': support1, 'S2': support2, 'S3': support3})

        # Identify Pivot High Points using swing points
        if i > 0 and i < len(data) - 1:
            if high > data.loc[i-1, 'high'] and high > data.loc[i+1, 'high']:
                pivot_high_points.append({'index': i, 'value': high})

        # Identify Pivot Low Points using swing points
        if i > 0 and i < len(data) - 1:
            if low < data.loc[i-1, 'low'] and low < data.loc[i+1, 'low']:
                pivot_low_points.append({'index': i, 'value': low})

    return pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points

# Create a list to store all the data frames
data_frames = []

# Specify the folder path containing the CSV files
folder_path = "./data_frames"

# Iterate over each file in the folder
for filename in os.listdir(folder_path):
    if filename.endswith(".csv"):
        file_path = os.path.join(folder_path, filename)

        # Read the data from the CSV file
        data = pd.read_csv(file_path)

        # Add the data frame to the list
        data_frames.append(data)

        # Extract the file name without the extension
        file_name = os.path.splitext(filename)[0]

        # Calculate pivot points and other parameters
        pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points = calculate_pivot_points(data)

        # Extract closing prices
        closing_prices = data['close']

        # Define the range of parameter values to test
        parameter_range = range(1, 40)

        # Calculate scores for different parameter combinations
        parameter_scores = []
        for high_parameter in parameter_range:
            for low_parameter in parameter_range:
                if low_parameter <= 8:  # Add the condition here
                    # Determine significant pivot high points using swing points
                    significant_high_points = []
                    for point in pivot_high_points:
                        if point['index'] > 0 and point['index'] < len(data) - 1:
                            high_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'high']
                            if point['value'] == high_range.max():
                                significant_high_points.append(point)

                    # Determine significant pivot low points using swing points
                    significant_low_points = []
                    for point in pivot_low_points:
                        if point['index'] > 0 and point['index'] < len(data) - 1:
                            low_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'low']
                            if point['value'] == low_range.min():
                                significant_low_points.append(point)

                    # Calculate the score as the difference between high and low point counts
                    score = len(significant_high_points) - len(significant_low_points)
                    parameter_scores.append((high_parameter, low_parameter, score))

        # Convert the scores to a NumPy array for easier manipulation
        scores = np.array(parameter_scores)

        # Find the optimal parameter values using the knee point
        if len(scores) > 0:
            knee_index = argrelextrema(scores[:, 2], np.less)[0][-1]
            optimal_high_parameter, optimal_low_parameter, optimal_score = scores[knee_index]
        else:
            optimal_high_parameter = 16  # Manually assign the value
            optimal_low_parameter = 2  # Manually assign the value

        print("Optimal high parameter value:", optimal_high_parameter)
        print("Optimal low parameter value:", optimal_low_parameter)

        # Plot line chart for closing prices
        plt.plot(closing_prices, label='Closing Prices')

        slope_range = 1  # Adjust this range as per your requirement

        # Calculate the trendlines for connecting the pivot high points
        trendlines_high = []
        trendline_points_high = []
        for i in range(0, len(significant_high_points) - 2):
            point1 = significant_high_points[i]
            point2 = significant_high_points[i+1]
            slope1 = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])

            point3 = significant_high_points[i+1]
            point4 = significant_high_points[i+2]
            slope2 = (point4['value'] - point3['value']) / (point4['index'] - point3['index'])

            slope_difference = abs(slope2 - slope1)

            if slope1 < 0:
                if not trendline_points_high:
                    trendline_points_high.append(point1)
                if slope_difference <= slope_range:
                    trendline_points_high.append(point2)
                else:
                    if len(trendline_points_high) > 1:
                        trendlines_high.append(trendline_points_high)
                    trendline_points_high = [point2]  # Start a new trendline with point2

        if len(trendline_points_high) > 1:
            trendlines_high.append(trendline_points_high)

        # Calculate the trendlines for connecting the pivot low points
        trendlines_low = []
        trendline_points_low = []
        for i in range(0, len(significant_low_points) - 2):
            point1 = significant_low_points[i]
            point2 = significant_low_points[i+1]
            slope1 = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])

            point3 = significant_low_points[i+1]
            point4 = significant_low_points[i+2]
            slope2 = (point4['value'] - point3['value']) / (point4['index'] - point3['index'])

            slope_difference = abs(slope2 - slope1)

            if slope1 > 0:
                if not trendline_points_low:
                    trendline_points_low.append(point1)
                if slope_difference <= slope_range:
                    trendline_points_low.append(point2)
                else:
                    if len(trendline_points_low) > 1:
                        trendlines_low.append(trendline_points_low)
                    trendline_points_low = [point2]  # Start a new trendline with point2

        if len(trendline_points_low) > 1:
            trendlines_low.append(trendline_points_low)

        # Plot the trendlines for positive slope
        for trendline_points_high in trendlines_high:
            x_values = [point['index'] for point in trendline_points_high]
            y_values = [point['value'] for point in trendline_points_high]
            plt.plot(x_values, y_values, color='red', linestyle='dashed')

        # Plot the significant pivot high points
        x_values = [point['index'] for point in significant_high_points]
        y_values = [point['value'] for point in significant_high_points]
        plt.scatter(x_values, y_values, color='red', label='Significant Pivot High Points')

        # Plot the trendlines for positive slope
        for trendline_points_low in trendlines_low:
            x_values = [point['index'] for point in trendline_points_low]
            y_values = [point['value'] for point in trendline_points_low]
            plt.plot(x_values, y_values, color='green', linestyle='dashed')

        # Plot the significant pivot low points
        x_values = [point['index'] for point in significant_low_points]
        y_values = [point['value'] for point in significant_low_points]
        plt.scatter(x_values, y_values, color='green', label='Significant Pivot Low Points')

        # Set chart title and labels
        plt.title(f'Closing Prices with Trendlines and Significant Pivot Points ({file_name})')
        plt.xlabel('Index')
        plt.ylabel('Closing Price')

        # Show the chart for the current data frame
        plt.legend()
        plt.show()
Run Code Online (Sandbox Code Playgroud)

根据我刚才所说的逻辑,这是我的新方法,但绘图仍然离我们想要的不远。

joh*_*ins 1

这是一个使用 KMeans 聚类和线性回归技术来绘制优化趋势线的示例

簇的数量是硬编码的(并且可以通过变量轻松更改n_clusters);在更复杂的版本中,将得出基于数据本身的最佳集群数量(例如,这就是mean_squared_error包含在内的原因,但我最终没有在本演示中使用这些集群;使用该指标总是选择可能的最大集群数量作为最好的 - 会有一个更好的方法来找到理想的簇数量,但是,通过一小轮的试错运行手动参数化它并不太困难或耗时)。

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go

from sklearn.cluster import KMeans
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error


def fit_line_to_cluster(df, cluster_label):
    reg = LinearRegression().fit(df[["index"]], df["value"])
    pred = reg.predict(df[["index"]])
    mse = mean_squared_error(df["value"], pred)
    return reg, mse


def calculate_pivot_points(data):
    pivot_points = []
    pivot_high_points = []
    pivot_low_points = []

    for i in range(len(data)):
        high = data.loc[i, "high"]
        low = data.loc[i, "low"]
        close = data.loc[i, "close"]

        # Calculate Pivot Point
        pivot_point = (high + low + close) / 3
        pivot_points.append(pivot_point)

        # Identify Pivot High Points using swing points
        if i > 0 and i < len(data) - 1:
            if (
                high > data.loc[i - 1, "high"]
                and high > data.loc[i + 1, "high"]
            ):
                pivot_high_points.append({"index": i, "value": high})

        # Identify Pivot Low Points using swing points
        if i > 0 and i < len(data) - 1:
            if low < data.loc[i - 1, "low"] and low < data.loc[i + 1, "low"]:
                pivot_low_points.append({"index": i, "value": low})

    return (
        pivot_points,
        pivot_high_points,
        pivot_low_points,
    )


def add_fitted_lines_to_plotly(fig, df, models, colors):
    for i, model in enumerate(models):
        x_vals = df[df["cluster"] == i]["index"].values
        y_vals = model.predict(x_vals.reshape(-1, 1))
        fig.add_trace(
            go.Scatter(
                x=x_vals, y=y_vals, mode="lines", line=dict(color=colors[i]),
            )
        )


### Data Analysis
df = pd.read_excel("~/Downloads/data3.xlsx")
df["time"] = pd.to_datetime(df["timestamp"])

pivot_points, pivot_high_points, pivot_low_points = calculate_pivot_points(df)

high_df = pd.DataFrame(pivot_high_points)
low_df = pd.DataFrame(pivot_low_points)


## Clustering
n_clusters = 20

optimal_high_models = []
optimal_low_models = []

# For high points
kmeans_high = KMeans(n_clusters=n_clusters, random_state=0).fit(
    high_df[["index", "value"]]
)
high_df["cluster"] = kmeans_high.labels_
for i in range(n_clusters):
    cluster_data = high_df[high_df["cluster"] == i]
    model, mse = fit_line_to_cluster(cluster_data, i)
    optimal_high_models.append(model)

# For low points
kmeans_low = KMeans(n_clusters=n_clusters, random_state=0).fit(
    low_df[["index", "value"]]
)
low_df["cluster"] = kmeans_low.labels_
for i in range(n_clusters):
    cluster_data = low_df[low_df["cluster"] == i]
    model, mse = fit_line_to_cluster(cluster_data, i)
    optimal_low_models.append(model)

closing_prices = df["close"].values


### Plotting
fig = go.Figure()


# Plot closing, high, and low points
fig.add_trace(
    go.Scatter(
        x=list(range(len(closing_prices))),
        y=closing_prices,
        mode="lines",
        name="Closing Prices",
        line=dict(color="blue"),
        opacity=0.75,
    )
)

fig.add_trace(
    go.Scatter(
        x=high_df["index"],
        y=high_df["value"],
        mode="markers",
        name="High Points",
        marker=dict(color="red"),
        opacity=0.75,
        marker_size=5,
    )
)
fig.add_trace(
    go.Scatter(
        x=low_df["index"],
        y=low_df["value"],
        mode="markers",
        name="Low Points",
        marker=dict(color="green"),
        opacity=0.75,
        marker_size=5,
    )
)

# Add optimal trendlines for high and low clusters
add_fitted_lines_to_plotly(
    fig, high_df, optimal_high_models, ["magenta"] * n_clusters
)
add_fitted_lines_to_plotly(
    fig, low_df, optimal_low_models, ["cyan"] * n_clusters
)

# Set chart title and labels
fig.update_layout(
    title="High and Low Pivot Points with Clustered Trendlines",
    xaxis_title="Time",
    yaxis_title="Price",
)


fig.show()
Run Code Online (Sandbox Code Playgroud)

结果是: 绘制主图,显示带有趋势线的所有数据 使用 Plotly,它允许放大数据,例如: 变焦 #1 变焦 #2