小编011*_*000的帖子

我如何对PySpark程序进行单元测试?

我当前的Java /星火单元测试方法效果(详细点击这里)通过使用JUnit"本地"和运行单元测试实例化一个SparkContext.

必须组织代码以在一个函数中执行I/O,然后使用多个RDD调用另一个函数.

这非常有效.我有一个用Java + Spark编写的高度测试的数据转换.

我可以用Python做同样的事吗?

我如何用Python运行Spark单元测试?

python unit-testing apache-spark pyspark

31
推荐指数
6
解决办法
2万
查看次数

Spark on Yarn如何存储洗牌文件?

我正在使用Yarn在Spark中执行过滤器并收到以下错误.任何帮助表示赞赏,但我的主要问题是为什么找不到该文件.

/ HDATA/10 /纱线/纳米/ usercache/spettinato /应用程序缓存/ application_1428497227446_131967 /火花本地20150708124954-AA00/05/merged_shuffle_1_343_1

看起来Spark在被洗牌后找不到存储到HDFS的文件.

为什么Spark访问目录"/ hdata /"?这个目录在HDFS中不存在,它应该是本地目录还是HDFS目录?
我可以配置存储随机数据的位置吗?

15/07/08 12:57:03 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
java.io.FileNotFoundException: /hdata/10/yarn/nm/usercache/spettinato/appcache/application_1428497227446_131967/spark-local-20150708124954-aa00/05/merged_shuffle_1_343_1 (No such file or directory)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
        at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116)
        at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177)
        at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
        at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.Task.run(Task.scala:51)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

编辑:我想出了一些.spark.local.dir配置的目录是用于将RDD存储到磁盘的本地目录,如http://spark.apache.org/docs/latest/configuration.html所示.

apache-spark

9
推荐指数
2
解决办法
3682
查看次数

redis-py 模块是否可以在集群模式下与 Redis 一起使用?

我正在尝试在集群模式下将redis-py与 redis一起使用,但我无法让它工作。我看到redis-py-cluster可以工作,但是我更喜欢 redis-py,因为我一直在使用它并且它是推荐的 client

python redis redis-py

8
推荐指数
2
解决办法
5785
查看次数

为什么我的任务在Spark UI中的“任务总数”上方成功?

在此处输入图片说明

我以为成功等于失败的总数。

这些数字从何而来?

这些数字表明存在问题,还是根本不重要?

apache-spark

6
推荐指数
1
解决办法
853
查看次数

PySpark DataFrame 是否有像 Pandas 中那样的“管道”功能?

例如在 Pandas 中我会做

data_df = (
     pd.DataFrame(dict(col1=['a', 'b', 'c'], col2=['1', '2', '3']))
     .pipe(lambda df: df[df.col1 != 'a'])
 )   
Run Code Online (Sandbox Code Playgroud)

这类似于R的管道%>%

PySpark中有类似的东西吗?

python pandas pyspark

5
推荐指数
2
解决办法
4405
查看次数

使用 Redis 与 Sentinel 和 redis-py 时如何故障转移到新的主节点?

订阅Sentinel故障转移,Channel的名称是什么,如何在订阅的函数中检测到需要刷新master?

我有一个使用 Redis Sentinel 实现高可用性和故障转移的多节点 Redis 设置。

我需要为 Redis 设置一个 Pub/Sub 来检测 Redis Master 何时发生故障并且系统何时选举了新的 Master。

_sentinel = redis.sentinel.Sentinel([(app.config["REDIS_HOSTNAME"],app.config["REDIS_SENTINEL_PORT"])])
_master = _sentinel.master_for(app.config["REDIS_SERVICE_NAME"])

def _sentinel_message_handler(message):
    #TODO how do I detect that there is a new Redis Master?

_pubsub = _master.pubsub()
_pubsub.subscribe(**{app.config["TODO"]:_sentinel_message_handler})
Run Code Online (Sandbox Code Playgroud)

python failover high-availability redis redis-py

1
推荐指数
1
解决办法
3345
查看次数