这是另一个从未回答过的论坛上的别人问题的副本,所以我想我会在这里重新提问,因为我有同样的问题.(见http://geekple.com/blogs/feeds/Xgzu7/posts/351703064084736)
我在我的机器上正确安装了Spark,并且当使用./bin/pyspark作为我的python解释器时,能够使用pyspark模块运行python程序而不会出错.
但是,当我尝试运行常规Python shell时,当我尝试导入pyspark模块时,我收到此错误:
from pyspark import SparkContext
Run Code Online (Sandbox Code Playgroud)
它说
"No module named pyspark".
Run Code Online (Sandbox Code Playgroud)
我怎样才能解决这个问题?是否需要设置环境变量以将Python指向pyspark headers/libraries/etc. 如果我的火花安装是/ spark /,我需要包含哪些pyspark路径?或者pyspark程序只能从pyspark解释器运行?
我正在尝试运行非常基本的Spark + Python pyspark教程 - 请参阅http://spark.apache.org/docs/0.9.0/quick-start.html
当我尝试初始化一个新的SparkContext时,
from pyspark import SparkContext
sc = SparkContext("local[4]", "test")
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
ValueError: Cannot run multiple SparkContexts at once
Run Code Online (Sandbox Code Playgroud)
我想知道我以前尝试运行示例代码是否会将某些内容加载到内存中并且无法清除.有没有办法列出已经在内存中的当前SparkContexts和/或清除它们以便运行示例代码?
我试图使用pyspark使用Python运行Spark graphx.我的安装看起来是正确的,因为我能够运行pyspark教程和(Java)GraphX教程.大概是因为GraphX是Spark的一部分,pyspark应该能够与它接口,对吗?
以下是pyspark的教程:http ://spark.apache.org/docs/0.9.0/quick-start.html http://spark.apache.org/docs/0.9.0/python-programming-guide. HTML
以下是GraphX的内容:http : //spark.apache.org/docs/0.9.0/graphx-programming-guide.html http://ampcamp.berkeley.edu/big-data-mini-course/graph-分析与- graphx.html
任何人都可以将GraphX教程转换为Python吗?
我需要为包含许多列的数据表生成row_numbers的完整列表.
在SQL中,这将如下所示:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Run Code Online (Sandbox Code Playgroud)
现在,让我们说在Spark中我有一个形式为(K,V)的RDD,其中V =(col1,col2,col3),所以我的条目就像
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
Run Code Online (Sandbox Code Playgroud)
我想使用sortBy(),sortWith(),sortByKey(),zipWithIndex等命令对它们进行排序,并使用正确的row_number创建一个新的RDD.
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
Run Code Online (Sandbox Code Playgroud)
(我不关心括号,所以表格也可以是(K,(col1,col2,col3,rownum))而不是)
我该怎么做呢?
这是我的第一次尝试:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3) …Run Code Online (Sandbox Code Playgroud) 我想更改GGally功能的调色板ggpairs.当我尝试将ggplot命令添加到使用返回的ggplot时getPlot,颜色不会改变.
my_pair_plot = ggpairs(dataset, color="var1")
getPlot(my_pair_plot,2,1) + scale_fill_brewer(palette = "Set2")
Run Code Online (Sandbox Code Playgroud)
尝试将ggplot命令直接放在ggpairs函数上会导致错误.
ggpairs(dataset, color="var1") + scale_fill_brewer(palette = "Set2")
Run Code Online (Sandbox Code Playgroud) 当我运行如下代码时:
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())
Run Code Online (Sandbox Code Playgroud)
并且观察Yarn中的阶段,我注意到Spark正在进行DAG计算TWICE - 一次用于实现RDD并将其缓存的distinct + count,然后是完全第二次创建检查点副本.
由于RDD已经实现并缓存,为什么检查点不能简单地利用这一点,并将缓存的分区保存到磁盘?
是否存在一种现有方式(某种配置设置或代码更改)以强制Spark利用此功能并仅运行ONCE操作,并且检查点只会复制内容?
我需要两次"实现"吗?
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())
newRDD.checkpoint
print(newRDD.count())
Run Code Online (Sandbox Code Playgroud)
我已经创建了一个Apache Spark Jira票证,以使其成为一项功能请求:https: //issues.apache.org/jira/browse/SPARK-8666
我有一个RDD,它太大而不能一致地执行一个不同的语句而没有虚假错误(例如,SparkException阶段失败4次,ExecutorLostFailure,HDFS文件系统关闭,最大执行程序失败次数,阶段因SparkContext关闭而被取消,等等)
我试图计算特定列中的不同ID,例如:
print(myRDD.map(a => a._2._1._2).distinct.count())
Run Code Online (Sandbox Code Playgroud)
是否有一种简单,一致,不太随机密集的方式来执行上面的命令,可能使用mapPartitions,reduceByKey,flatMap或其他使用较少shuffle而不是不同的命令?
Apache Spark RDD sortByKey 的 Big-O 时间复杂度是多少?
我正在尝试根据特定顺序将行号分配给 RDD。
假设我有一个 {K,V} 对 RDD,并且我希望使用 key 执行订单
myRDD.sortByKey(true).zipWithIndex
Run Code Online (Sandbox Code Playgroud)
此操作的时间复杂度是多少(以大 O 形式表示)?
幕后发生了什么?冒泡排序?我希望不是!我的数据集非常大并且跨分区运行,所以我很好奇 sortByKey 函数是否是最佳的,或者在分区内执行某种中间数据结构,然后跨分区执行其他操作以优化消息传递,或者什么。
我正在尝试与已经分布在我们集群中的 RDD 结合起来,并在键上进行散列分区。我不需要保留任何排序甚至分区,我只希望联合尽可能快。在这个例子中,我实际上确实想要所有记录,而不仅仅是不同的记录,而是保持多样性。
这是我天真地使用的内容:
val newRDD = tempRDD1.union(tempRDD2)
Run Code Online (Sandbox Code Playgroud)
这是有人向我推荐的更快,因为它利用了 RDD 已经分区和分布的方式:
val newRDD = tempRDD1.zipPartitions(tempRDD2, preservesPartitioning=true)((iter, iter2) => iter++iter2)
Run Code Online (Sandbox Code Playgroud)
其中哪个更快?结果是否完全一致,在成员方面?
我问这个是因为到目前为止我认为这些方法是等效的,但是当我增加数据的规模和分区、执行程序、内存等的数量时,我得到了 zipPartitions 方法的奇怪结果,这不是之后使用 reduceByKey 正常工作。
也许我的差异是由于我的 RDD 本身,它们具有 ((String, String), (String, Long, Long, Long, Long)) 形式,所以也许 iter++iter2 正在做一些事情而不是联合这些值?
zipPartitions 是否隐式地做了一些额外的事情,比如比较排序,或者重新散列的东西,或者通常以不同于联合的方式实现合并?
如果 RDD 包含非不同行、多个键副本、空分区、键的哈希冲突或任何其他此类问题,union-vs-zipPartitions 会返回不同的结果吗?
是的,我可以自己运行测试(事实上,过去 2 天我已经这样做了!),所以请不要发布任何愚蠢的问题,问我是否尝试过这样那样的......我是提出这个问题是为了更好地了解代码级别的幕后情况。“union”是作为“zipPartitions”的子案例编写的吗?
稍后编辑:按照@Holden 的建议添加一些带有 toDebugString 结果的示例
val tempIntermediateRDD6 = tempIntermediateRDD1.
zipPartitions(tempIntermediateRDD2, true)((iter, iter2) => iter++iter2).
zipPartitions(tempIntermediateRDD5, true)((iter, iter2) => iter++iter2).
partitionBy(partitioner).
setName("tempIntermediateRDD6").
persist(StorageLevel.MEMORY_AND_DISK_SER)
tempIntermediateRDD6.checkpoint
println(tempIntermediateRDD6.toDebugString)
// (2568) tempIntermediateRDD6 ZippedPartitionsRDD2[169] at zipPartitions at mycode.scala:3203 [Disk Memory …Run Code Online (Sandbox Code Playgroud) 我有一个使用的生成的镶木地板格式的Hive表
create table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet;
Run Code Online (Sandbox Code Playgroud)
我能够验证它已被填充 - 这是一个示例值
[1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]
Run Code Online (Sandbox Code Playgroud)
我希望将其放入表单的Spark RDD中
((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))
Run Code Online (Sandbox Code Playgroud)
现在,使用spark-shell(我在spark-submit中遇到了同样的问题),我用这些值做了一个测试RDD
scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello"))))))
tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85
Run Code Online (Sandbox Code Playgroud)
使用迭代器,我可以在下面的新RDD中将ArrayBuffer转换为HashSet:
scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, …Run Code Online (Sandbox Code Playgroud)