我想DataFrame使用与列长度相关的条件来过滤a ,这个问题可能很容易,但我没有在SO中找到任何相关的问题.
更具体的,我有一个DataFrame只有一个Column,其中ArrayType(StringType()),我要筛选的DataFrame使用长度filterer,我拍下面的一个片段.
df = sqlContext.read.parquet("letters.parquet")
df.show()
# The output will be
# +------------+
# | tokens|
# +------------+
# |[L, S, Y, S]|
# |[L, V, I, S]|
# |[I, A, N, A]|
# |[I, L, S, A]|
# |[E, N, N, Y]|
# |[E, I, M, A]|
# |[O, A, N, A]|
# | [S, U, S]|
# +------------+
# But I want only the entries with length …Run Code Online (Sandbox Code Playgroud) 在我的猪代码中,我这样做:
all_combined = Union relation1, relation2,
relation3, relation4, relation5, relation 6.
Run Code Online (Sandbox Code Playgroud)
我想用火花做同样的事情.然而,不幸的是,我发现我必须继续这样做:
first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
# .... and so on
Run Code Online (Sandbox Code Playgroud)
是否有一个联合运算符可以让我一次操作多个rdds:
例如 union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)
这是一个方便的问题.
我有一个PySpark数据帧
+-------+--------------+----+----+
|address| date|name|food|
+-------+--------------+----+----+
|1111111|20151122045510| Yin|gre |
|1111111|20151122045501| Yin|gre |
|1111111|20151122045500| Yln|gra |
|1111112|20151122065832| Yun|ddd |
|1111113|20160101003221| Yan|fdf |
|1111111|20160703045231| Yin|gre |
|1111114|20150419134543| Yin|fdf |
|1111115|20151123174302| Yen|ddd |
|2111115| 20123192| Yen|gre |
+-------+--------------+----+----+
Run Code Online (Sandbox Code Playgroud)
我想转换为与pyspark.ml一起使用.我可以使用StringIndexer将name列转换为数字类别:
indexer = StringIndexer(inputCol="name", outputCol="name_index").fit(df)
df_ind = indexer.transform(df)
df_ind.show()
+-------+--------------+----+----------+----+
|address| date|name|name_index|food|
+-------+--------------+----+----------+----+
|1111111|20151122045510| Yin| 0.0|gre |
|1111111|20151122045501| Yin| 0.0|gre |
|1111111|20151122045500| Yln| 2.0|gra |
|1111112|20151122065832| Yun| 4.0|ddd |
|1111113|20160101003221| Yan| 3.0|fdf |
|1111111|20160703045231| Yin| 0.0|gre |
|1111114|20150419134543| Yin| 0.0|fdf |
|1111115|20151123174302| Yen| 1.0|ddd |
|2111115| …Run Code Online (Sandbox Code Playgroud) 我正在使用Spark 1.4进行研究并在内存设置方面苦苦挣扎.我的机器有16GB的内存所以没有问题,因为我的文件大小只有300MB.虽然,当我尝试使用toPandas()函数将Spark RDD转换为panda数据帧时,我收到以下错误:
serialized results of 9 tasks (1096.9 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
Run Code Online (Sandbox Code Playgroud)
我试图修复这个改变spark-config文件并仍然得到相同的错误.我听说这是火花1.4的一个问题,并想知道你是否知道如何解决这个问题.任何帮助深表感谢.
我最近在我的服务器上使用PySpark与Ipython一起使用24个CPU和32GB RAM.它只能在一台机器上运行.在我的过程中,我想收集大量数据,如下面的代码所示:
train_dataRDD = (train.map(lambda x:getTagsAndText(x))
.filter(lambda x:x[-1]!=[])
.flatMap(lambda (x,text,tags): [(tag,(x,text)) for tag in tags])
.groupByKey()
.mapValues(list))
Run Code Online (Sandbox Code Playgroud)
当我做
training_data = train_dataRDD.collectAsMap()
Run Code Online (Sandbox Code Playgroud)
它给了我outOfMemory错误.Java heap Space.此外,我在此错误后无法对Spark执行任何操作,因为它失去了与Java的连接.它给出了Py4JNetworkError: Cannot connect to the java server.
看起来堆空间很小.如何将其设置为更大的限制?
编辑:
我在跑步之前尝试过的事情:
sc._conf.set('spark.executor.memory','32g').set('spark.driver.memory','32g').set('spark.driver.maxResultsSize','0')
我按照此处的文档更改了spark选项(如果你执行ctrl-f并搜索spark.executor.extraJavaOptions):http://spark.apache.org/docs/1.2.1/configuration.html
它说我可以通过设置spark.executor.memory选项来避免OOM.我做了同样的事情,但似乎没有工作.
我试图将Spark RDD转换为DataFrame.我已经看到了将方案传递给sqlContext.CreateDataFrame(rdd,schema)函数的文档和示例
.
但我有38列或字段,这将进一步增加.如果我手动给出指定每个字段信息的模式,那将会是如此繁琐的工作.
有没有其他方法可以在不知道先前列的信息的情况下指定模式.
我想知道如何在Spark(Pyspark)中实现以下功能
初始数据帧:
+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
Run Code Online (Sandbox Code Playgroud)
结果数据帧:
+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0| 7.0 |
+--+---+-------+
|3 |7.0| 3.0 |
+--+---+-------+
|2 |3.0| 5.0 |
+--+---+-------+
Run Code Online (Sandbox Code Playgroud)
我设法通过以下方式将新列"附加"到数据框中:
df.withColumn("new_Col", df.num * 10)
但是我不知道如何为新列实现这种"行的移位",以便新列具有前一行的字段值(如示例所示).我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容.
任何帮助,将不胜感激.
我有一个RDD,我想将其转换为pandas dataframe.我知道要转换,我们可以做到RDD正常dataframe
df = rdd1.toDF()
Run Code Online (Sandbox Code Playgroud)
但我想转换RDD为pandas dataframe而不是正常dataframe.我该怎么做?
Spark现在提供可在数据帧中使用的预定义函数,并且它们似乎已经过高度优化.我最初的问题是更快,但我自己做了一些测试,发现至少在一个实例中,spark函数的速度提高了大约10倍.有谁知道为什么会这样,什么时候udf会更快(仅适用于存在相同spark函数的情况)?
这是我的测试代码(在Databricks社区上运行):
# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
name = fake.name().split()
return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
for _ in xrange(times):
yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print …Run Code Online (Sandbox Code Playgroud) performance user-defined-functions apache-spark apache-spark-sql pyspark
上下文:我有DataFrame2列:单词和向量.其中"vector"的列类型是VectorUDT.
一个例子:
word | vector
assert | [435,323,324,212...]
Run Code Online (Sandbox Code Playgroud)
我希望得到这个:
word | v1 | v2 | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....
Run Code Online (Sandbox Code Playgroud)
题:
如何使用PySpark为每个维度拆分包含多列向量的列?
提前致谢
python apache-spark apache-spark-sql pyspark apache-spark-ml
pyspark ×10
apache-spark ×9
python ×8
dataframe ×2
rdd ×2
heap-memory ×1
ipython ×1
java ×1
jupyter ×1
memory ×1
pandas ×1
performance ×1