小编pal*_*ter的帖子

Python单元测试模拟pyspark链

我想为具有 pyspark 代码的简单方法编写一些单元测试。

def do_stuff(self, df1: DataFrame, df2_path: str, df1_key: str, df2_key: str) -> DataFrame:
    df2 = self.spark.read.format('parquet').load(df2_path)
    return df1.join(df2, [f.col(df1_key) == f.col(df2_key)], 'left')
Run Code Online (Sandbox Code Playgroud)

我如何模拟火花阅读部分?我试过这个:

@patch("class_to_test.SparkSession")
def test_do_stuff(self, mock_spark: MagicMock) -> None:
    spark = MagicMock()
    spark.read.return_value.format.return_value.load.return_value = \
        self.spark.createDataFrame([(1, 2)], ["key2", "c2"])
    mock_spark.return_value = spark

    input_df = self.spark.createDataFrame([(1, 1)], ["key1", "c1"])
    actual_df = ClassToTest().do_stuff(input_df, "df2", "key1", "key2")
    expected_df = self.spark.createDataFrame([(1, 1, 1, 2)], ["key1", "c1", "key2", "c2"])
    assert_pyspark_df_equal(actual_df, expected_df)
Run Code Online (Sandbox Code Playgroud)

但它因以下错误而失败:
py4j.Py4JException: Method join([class java.util.ArrayList, class org.apache.spark.sql.Column, class java.lang.String]) does not …

python mocking python-unittest pyspark python-unittest.mock

7
推荐指数
1
解决办法
3474
查看次数