Que*_*ank 5 python unit-testing pyspark
我有pyspark下面的脚本.我想function在这个脚本中对单元进行单元测试.
def rename_chars(column_name):
chars = ((' ', '_&'), ('.', '_$'))
new_cols = reduce(lambda a, kv: a.replace(*kv), chars, column_name)
return new_cols
def column_names(df):
changed_col_names = df.schema.names
for cols in changed_col_names:
df = df.withColumnRenamed(cols, rename_chars(cols))
return df
Run Code Online (Sandbox Code Playgroud)
我在unittest下面写了一个测试函数.
但我不知道如何提交unittest.我做过spark-submit哪些都没有做任何事情.
import unittest
from my_script import column_names
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = sqlContext.createDataFrame(val, cols)
class RenameColumnNames(unittest.TestCase):
def test_column_names(self):
df1 = column_names(df)
result = df1.schema.names
expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
self.assertEqual(result, expected)
Run Code Online (Sandbox Code Playgroud)
如何将此脚本集成为一个 unittest
我可以在已pyspark安装的节点上运行此操作吗?
Pyspark Unittests指南
1.您需要从站点下载 Spark发行版并将其解压缩.或者,如果您已经有Spark和Python的工作分发,只需安装pyspark:pip install pyspark
2.如果需要,设置这样的系统变量:
export SPARK_HOME="/home/eugene/spark-1.6.0-bin-hadoop2.6"
export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
export PATH="SPARK_HOME/bin:$PATH"
Run Code Online (Sandbox Code Playgroud)
我在.profile的主目录中添加了这个.如果您已经有Spark的工作分布,则可以设置此变量.
3.此外,您可能需要设置:
PYSPARK_SUBMIT_ARGS="--jars path/to/hive/jars/jar.jar,path/to/other/jars/jar.jar --conf spark.driver.userClassPathFirst=true --master local[*] pyspark-shell"
PYSPARK_PYTHON="/home/eugene/anaconda3/envs/ste/bin/python3"
Run Code Online (Sandbox Code Playgroud)
Python和罐子?是. Pyspark使用py4j与Spark的java部分进行通信.如果你想解决更复杂的情况,例如在Python中使用测试运行Kafka服务器或者在示例中使用Scala中的TestHiveContext,则应指定jar.我通过Idea运行配置环境变量做到了.
4.And你可以使用pyspark/tests.py,pyspark/streaming/tests.py,pyspark/sql/tests.py,pyspark/ml/tests.py,pyspark/mllib/tests.py脚本至极包含各种测试用例类和实例进行测试pyspark应用.在你的情况下你可以做(例如来自pyspark/sql/tests.py):
class HiveContextSQLTests(ReusedPySparkTestCase):
@classmethod
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
os.unlink(cls.tempdir.name)
_scala_HiveContext =\
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()
@classmethod
def tearDownClass(cls):
ReusedPySparkTestCase.tearDownClass()
shutil.rmtree(cls.tempdir.name, ignore_errors=True)
Run Code Online (Sandbox Code Playgroud)
但是如前所述,您需要在PYSPARK_SUBMIT_ARGS中指定带有Hive库的-jars
或没有蜂巢:
class SQLContextTests(ReusedPySparkTestCase):
def test_get_or_create(self):
sqlCtx = SQLContext.getOrCreate(self.sc)
self.assertTrue(SQLContext.getOrCreate(self.sc) is sqlCtx)
Run Code Online (Sandbox Code Playgroud)
据我所知,如果已经安装了pyspark pip,你没有在示例中描述的tests.py.在这种情况下,只需从Spark站点下载分发并复制代码示例.
现在您可以正常运行TestCase: python -m unittest test.py
更新: 因为不推荐使用HiveContext和SqlContext的spark 2.3.您可以使用SparkSession Hive API.
这是测试功能的轻量级方法。您不需要下载 Spark 来运行 PySpark 测试(如接受的答案大纲)。下载 Spark 是一种选择,但不是必需的。这是测试:
import pysparktestingexample.stackoverflow as SO
from chispa import assert_df_equality
import pyspark.sql.functions as F
def test_column_names(spark):
source_data = [
("jose", "oak", "switch")
]
source_df = spark.createDataFrame(source_data, ["some first name", "some.tree.type", "a gaming.system"])
actual_df = SO.column_names(source_df)
expected_data = [
("jose", "oak", "switch")
]
expected_df = spark.createDataFrame(expected_data, ["some_&first_&name", "some_$tree_$type", "a_&gaming_$system"])
assert_df_equality(actual_df, expected_df)
Run Code Online (Sandbox Code Playgroud)
测试使用的SparkSession在tests/conftest.py文件中定义:
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope='session')
def spark():
return SparkSession.builder \
.master("local") \
.appName("chispa") \
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
该测试使用chispaassert_df_equality库中定义的函数。
在 Python 社区中,pytest 通常比 unittest 更受青睐。 这篇博文解释了如何测试 PySpark 程序,并且具有讽刺意味的是,它有一个modify_column_names功能可以让您更优雅地重命名这些列。
def modify_column_names(df, fun):
for col_name in df.columns:
df = df.withColumnRenamed(col_name, fun(col_name))
return df
Run Code Online (Sandbox Code Playgroud)