使用pytest测试Spark-无法在本地模式下运行Spark

Mrg*_*8m4 2 python pytest apache-spark pyspark

我正在尝试从此站点使用pytest运行单词计数测试-使用py.test 对Apache Spark进行单元测试。问题是我无法启动spark上下文。我用于运行Spark Context的代码:

@pytest.fixture(scope="session")
def spark_context(request):
    """ fixture for creating a spark context
    Args:
        request: pytest.FixtureRequest object
    """
    conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
    sc = SparkContext(conf=conf)
    request.addfinalizer(lambda: sc.stop())

    quiet_py4j()
    return sc
Run Code Online (Sandbox Code Playgroud)

我使用命令执行以下代码:

#first way
pytest spark_context_fixture.py

#second way
python spark_context_fixture.py
Run Code Online (Sandbox Code Playgroud)

输出:

platform linux2 -- Python 2.7.5, pytest-3.0.4, py-1.4.31, pluggy-0.4.0
rootdir: /home/mgr/test, inifile:
collected 0 items
Run Code Online (Sandbox Code Playgroud)

然后我想使用pytest运行wordcount测试。

pytestmark = pytest.mark.usefixtures("spark_context")

def test_do_word_counts(spark_context):
    """ test word couting
    Args:
        spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results
Run Code Online (Sandbox Code Playgroud)

但是输出是:

________ ERROR at setup of test_do_word_counts _________
file /home/mgrabowski/test/wordcount_test.py, line 5
  def test_do_word_counts(spark_context):
E       fixture 'spark_context' not found
>       available fixtures: cache, capfd, capsys, doctest_namespace, monkeypatch, pytestconfig, record_xml_property, recwarn, tmpdir, tmpdir_factory
>       use 'pytest --fixtures [testpath]' for help on them.
Run Code Online (Sandbox Code Playgroud)

有谁知道这个问题的原因是什么?

Mrg*_*8m4 5

我做了一些研究,终于找到了解决方案。我使用Spark 1.6。

首先,我在.bashrc文件中添加了两行。

export SPARK_HOME=/usr/hdp/2.5.0.0-1245/spark
export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPA??TH
Run Code Online (Sandbox Code Playgroud)

然后,我创建了文件“ conftest.py”。文件名确实很重要,请勿更改它,否则您会看到spark_context错误。如果您在本地模式下使用Spark而未使用YARN,则conftest.py应该如下所示:

import logging
import pytest

from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def quiet_py4j():
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)

@pytest.fixture(scope="session")
def spark_context(request):
    conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
    request.addfinalizer(lambda: sc.stop())

    sc = SparkContext(conf=conf)
    quiet_py4j()
    return sc

@pytest.fixture(scope="session")
def hive_context(spark_context):
    return HiveContext(spark_context)

@pytest.fixture(scope="session")
def streaming_context(spark_context):
    return StreamingContext(spark_context, 1)
Run Code Online (Sandbox Code Playgroud)

现在,您可以使用简单的pytest命令运行测试。Pytest应该运行Spark并最终将其停止。

如果使用YARN,则可以将conftest.py更改为:import logging import pytest

from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def quiet_py4j():
    """ turn down spark logging for the test context """
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)

@pytest.fixture(scope="session",
            params=[pytest.mark.spark_local('local'),
                    pytest.mark.spark_yarn('yarn')])
def spark_context(request):
    if request.param == 'local':
        conf = (SparkConf()
                .setMaster("local[2]")
                .setAppName("pytest-pyspark-local-testing")
                )
    elif request.param == 'yarn':
        conf = (SparkConf()
                .setMaster("yarn-client")
                .setAppName("pytest-pyspark-yarn-testing")
                .set("spark.executor.memory", "1g")
                .set("spark.executor.instances", 2)
                )
    request.addfinalizer(lambda: sc.stop())

    sc = SparkContext(conf=conf)
    return sc

@pytest.fixture(scope="session")
def hive_context(spark_context):
    return HiveContext(spark_context)

@pytest.fixture(scope="session")
def streaming_context(spark_context):
    return StreamingContext(spark_context, 1)
Run Code Online (Sandbox Code Playgroud)

现在,您可以通过调用在本地模式下运行测试,而通过调用py.test -m spark_local在YARN模式下运行测试py.test -m spark_yarn

字数范例

在同一文件夹中,创建三个文件:conftest.py(上方),wordcount.py:

def do_word_counts(lines):
    counts = (lines.flatMap(lambda x: x.split())
                  .map(lambda x: (x, 1))
                  .reduceByKey(lambda x, y: x+y)
             ) 
    results = {word: count for word, count in counts.collect()}
    return results
Run Code Online (Sandbox Code Playgroud)

还有wordcount_test.py:

import pytest
import wordcount

pytestmark = pytest.mark.usefixtures("spark_context")

def test_do_word_counts(spark_context):
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results
Run Code Online (Sandbox Code Playgroud)

现在,您可以通过调用来运行测试pytest