我如何对PySpark程序进行单元测试?

011*_*000 31 python unit-testing apache-spark pyspark

我当前的Java /星火单元测试方法效果(详细点击这里)通过使用JUnit"本地"和运行单元测试实例化一个SparkContext.

必须组织代码以在一个函数中执行I/O,然后使用多个RDD调用另一个函数.

这非常有效.我有一个用Java + Spark编写的高度测试的数据转换.

我可以用Python做同样的事吗?

我如何用Python运行Spark单元测试?

Vik*_*dia 24

我建议也使用py.test.py.test使创建可重用的SparkContext测试夹具变得容易,并使用它来编写简洁的测试功能.您还可以专门设置fixture(例如创建StreamingContext)并在测试中使用其中的一个或多个.

我在这个主题上写了一篇关于Medium的博客文章:

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

这是帖子的一个片段:

pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
    """ test word couting
    Args:
       spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results
Run Code Online (Sandbox Code Playgroud)

  • 欢迎来到SO!主要链接答案是皱眉头.(也就是说,作为消失的链接的答案将没有持久的价值.)建议添加一些有用的文本,总结或突出显示链接资源的关键点. (4认同)

ksi*_*ndi 16

如果您正在使用Spark 2.x和,那么这是pytest的解决方案SparkSession.我也在导入第三方包.

import logging

import pytest
from pyspark.sql import SparkSession

def quiet_py4j():
    """Suppress spark logging for the test context."""
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)


@pytest.fixture(scope="session")
def spark_session(request):
    """Fixture for creating a spark context."""

    spark = (SparkSession
             .builder
             .master('local[2]')
             .config('spark.jars.packages', 'com.databricks:spark-avro_2.11:3.0.1')
             .appName('pytest-pyspark-local-testing')
             .enableHiveSupport()
             .getOrCreate())
    request.addfinalizer(lambda: spark.stop())

    quiet_py4j()
    return spark


def test_my_app(spark_session):
   ...
Run Code Online (Sandbox Code Playgroud)

请注意,如果使用Python 3,我必须将其指定为PYSPARK_PYTHON环境变量:

import os
import sys

IS_PY2 = sys.version_info < (3,)

if not IS_PY2:
    os.environ['PYSPARK_PYTHON'] = 'python3'
Run Code Online (Sandbox Code Playgroud)

否则你会收到错误:

例外:worker中的Python与驱动程序3.5中的版本不同,PySpark不能运行不同的次要版本.请检查环境变量PYSPARK_PYTHON和PYSPARK_DRIVER_PYTHON是否正确设置.

  • 设置PYSPARK_PYTHON正确值的一种稍微简单的方法:`os.environ ['PYSPARK_PYTHON'] = sys.executable` - 这将设置为当前正在运行的python,并且有希望更好地应对venvs (4认同)

Jor*_*tao 9

假设已pyspark安装,则可以在下面的unitTest中使用以下类unittest

import unittest
import pyspark


class PySparkTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        conf = pyspark.SparkConf().setMaster("local[2]").setAppName("testing")
        cls.sc = pyspark.SparkContext(conf=conf)
        cls.spark = pyspark.SQLContext(cls.sc)

    @classmethod
    def tearDownClass(cls):
        cls.sc.stop()
Run Code Online (Sandbox Code Playgroud)

例:

class SimpleTestCase(PySparkTestCase):

    def test_with_rdd(self):
        test_input = [
            ' hello spark ',
            ' hello again spark spark'
        ]

        input_rdd = self.sc.parallelize(test_input, 1)

        from operator import add

        results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
        self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])

    def test_with_df(self):
        df = self.spark.createDataFrame(data=[[1, 'a'], [2, 'b']], 
                                        schema=['c1', 'c2'])
        self.assertEqual(df.count(), 2)
Run Code Online (Sandbox Code Playgroud)

请注意,这将为每个类创建一个上下文。使用setUp而不是setUpClass获取每个测试的上下文。这通常会增加执行测试的开销,因为创建新的Spark上下文当前很昂贵。


san*_*ton 8

我使用pytest,它允许测试装置,因此您可以实例化一个pyspark上下文并将其注入所有需要它的测试中.有点像

@pytest.fixture(scope="session",
                params=[pytest.mark.spark_local('local'),
                        pytest.mark.spark_yarn('yarn')])
def spark_context(request):
    if request.param == 'local':
        conf = (SparkConf()
                .setMaster("local[2]")
                .setAppName("pytest-pyspark-local-testing")
                )
    elif request.param == 'yarn':
        conf = (SparkConf()
                .setMaster("yarn-client")
                .setAppName("pytest-pyspark-yarn-testing")
                .set("spark.executor.memory", "1g")
                .set("spark.executor.instances", 2)
                )
    request.addfinalizer(lambda: sc.stop())

    sc = SparkContext(conf=conf)
    return sc

def my_test_that_requires_sc(spark_context):
    assert spark_context.textFile('/path/to/a/file').count() == 10
Run Code Online (Sandbox Code Playgroud)

然后,您可以通过调用py.test -m spark_local或在YARN中以本地模式运行测试py.test -m spark_yarn.这对我来说非常好.


Pow*_*ers 5

您可以通过在测试套件中的 DataFrame 上运行您的代码并比较 DataFrame 列相等或两个整个 DataFrame 的相等来测试 PySpark 代码。

奎因项目有几个例子

为测试套件创建 SparkSession

使用此夹具创建一个 tests/conftest.py 文件,以便您可以轻松访问测试中的 SparkSession。

import pytest
from pyspark.sql import SparkSession

@pytest.fixture(scope='session')
def spark():
    return SparkSession.builder \
      .master("local") \
      .appName("chispa") \
      .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

列相等示例

假设您想测试以下从字符串中删除所有非单词字符的函数。

def remove_non_word_characters(col):
    return F.regexp_replace(col, "[^\\w\\s]+", "")
Run Code Online (Sandbox Code Playgroud)

您可以使用chispa库中assert_column_equality定义的函数测试此函数。

def test_remove_non_word_characters(spark):
    data = [
        ("jo&&se", "jose"),
        ("**li**", "li"),
        ("#::luisa", "luisa"),
        (None, None)
    ]
    df = spark.createDataFrame(data, ["name", "expected_name"])\
        .withColumn("clean_name", remove_non_word_characters(F.col("name")))
    assert_column_equality(df, "clean_name", "expected_name")
Run Code Online (Sandbox Code Playgroud)

DataFrame 相等示例

有些功能需要通过比较整个 DataFrame 来测试。这是一个对 DataFrame 中的列进行排序的函数。

def sort_columns(df, sort_order):
    sorted_col_names = None
    if sort_order == "asc":
        sorted_col_names = sorted(df.columns)
    elif sort_order == "desc":
        sorted_col_names = sorted(df.columns, reverse=True)
    else:
        raise ValueError("['asc', 'desc'] are the only valid sort orders and you entered a sort order of '{sort_order}'".format(
            sort_order=sort_order
        ))
    return df.select(*sorted_col_names)
Run Code Online (Sandbox Code Playgroud)

这是您为此函数编写的一个测试。

def test_sort_columns_asc(spark):
    source_data = [
        ("jose", "oak", "switch"),
        ("li", "redwood", "xbox"),
        ("luisa", "maple", "ps4"),
    ]
    source_df = spark.createDataFrame(source_data, ["name", "tree", "gaming_system"])

    actual_df = T.sort_columns(source_df, "asc")

    expected_data = [
        ("switch", "jose", "oak"),
        ("xbox", "li", "redwood"),
        ("ps4", "luisa", "maple"),
    ]
    expected_df = spark.createDataFrame(expected_data, ["gaming_system", "name", "tree"])

    assert_df_equality(actual_df, expected_df)
Run Code Online (Sandbox Code Playgroud)

测试输入/输出

通常最好从 I/O 函数中抽象出代码逻辑,这样它们更容易测试。

假设你有一个这样的函数。

def your_big_function:
    df = spark.read.parquet("some_directory")
    df2 = df.withColumn(...).transform(function1).transform(function2)
    df2.write.parquet("other directory")
Run Code Online (Sandbox Code Playgroud)

最好像这样重构代码:

def all_logic(df):
  return df.withColumn(...).transform(function1).transform(function2)

def your_formerly_big_function:
    df = spark.read.parquet("some_directory")
    df2 = df.transform(all_logic)
    df2.write.parquet("other directory")
Run Code Online (Sandbox Code Playgroud)

像这样设计您的代码可以让您轻松地all_logic使用上面提到的列相等或 DataFrame 相等函数来测试函数。您可以使用模拟来测试your_formerly_big_function. 通常最好避免在测试套件中使用 I/O(但有时不可避免)。


Sai*_*r S 5

pyspark有unittest模块,可以按如下方式使用

from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase

class MySparkTests(PySparkTestCase):
    def spark_session(self):
        return pyspark.SQLContext(self.sc)

    def createMockDataFrame(self):
         self.spark_session().createDataFrame(
            [
                ("t1", "t2"),
                ("t1", "t2"),
                ("t1", "t2"),
            ],
            ['col1', 'col2']
        )
Run Code Online (Sandbox Code Playgroud)