标签: pyspark

使用列的长度过滤DataFrame

我想DataFrame使用与列长度相关的条件来过滤a ,这个问题可能很容易,但我没有在SO中找到任何相关的问题.

更具体的,我有一个DataFrame只有一个Column,其中ArrayType(StringType()),我要筛选的DataFrame使用长度filterer,我拍下面的一个片段.

df = sqlContext.read.parquet("letters.parquet")
df.show()

# The output will be 
# +------------+
# |      tokens|
# +------------+
# |[L, S, Y, S]|
# |[L, V, I, S]|
# |[I, A, N, A]|
# |[I, L, S, A]|
# |[E, N, N, Y]|
# |[E, I, M, A]|
# |[O, A, N, A]|
# |   [S, U, S]|
# +------------+

# But I want only the entries with length …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

35
推荐指数
2
解决办法
5万
查看次数

多个RDD的Spark联合

在我的猪代码中,我这样做:

all_combined = Union relation1, relation2, 
    relation3, relation4, relation5, relation 6.
Run Code Online (Sandbox Code Playgroud)

我想用火花做同样的事情.然而,不幸的是,我发现我必须继续这样做:

first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
# .... and so on
Run Code Online (Sandbox Code Playgroud)

是否有一个联合运算符可以让我一次操作多个rdds:

例如 union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)

这是一个方便的问题.

python apache-spark rdd pyspark

35
推荐指数
1
解决办法
6万
查看次数

将StringIndexer应用于PySpark Dataframe中的多个列

我有一个PySpark数据帧

+-------+--------------+----+----+
|address|          date|name|food|
+-------+--------------+----+----+
|1111111|20151122045510| Yin|gre |
|1111111|20151122045501| Yin|gre |
|1111111|20151122045500| Yln|gra |
|1111112|20151122065832| Yun|ddd |
|1111113|20160101003221| Yan|fdf |
|1111111|20160703045231| Yin|gre |
|1111114|20150419134543| Yin|fdf |
|1111115|20151123174302| Yen|ddd |
|2111115|      20123192| Yen|gre |
+-------+--------------+----+----+
Run Code Online (Sandbox Code Playgroud)

我想转换为与pyspark.ml一起使用.我可以使用StringIndexer将name列转换为数字类别:

indexer = StringIndexer(inputCol="name", outputCol="name_index").fit(df)
df_ind = indexer.transform(df)
df_ind.show()
+-------+--------------+----+----------+----+
|address|          date|name|name_index|food|
+-------+--------------+----+----------+----+
|1111111|20151122045510| Yin|       0.0|gre |
|1111111|20151122045501| Yin|       0.0|gre |
|1111111|20151122045500| Yln|       2.0|gra |
|1111112|20151122065832| Yun|       4.0|ddd |
|1111113|20160101003221| Yan|       3.0|fdf |
|1111111|20160703045231| Yin|       0.0|gre |
|1111114|20150419134543| Yin|       0.0|fdf |
|1111115|20151123174302| Yen|       1.0|ddd |
|2111115| …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

35
推荐指数
2
解决办法
2万
查看次数

Spark 1.4增加了maxResultSize内存

我正在使用Spark 1.4进行研究并在内存设置方面苦苦挣扎.我的机器有16GB的内存所以没有问题,因为我的文件大小只有300MB.虽然,当我尝试使用toPandas()函数将Spark RDD转换为panda数据帧时,我收到以下错误:

serialized results of 9 tasks (1096.9 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
Run Code Online (Sandbox Code Playgroud)

我试图修复这个改变spark-config文件并仍然得到相同的错误.我听说这是火花1.4的一个问题,并想知道你是否知道如何解决这个问题.任何帮助深表感谢.

python memory apache-spark pyspark jupyter

34
推荐指数
4
解决办法
3万
查看次数

PySpark:java.lang.OutofMemoryError:Java堆空间

我最近在我的服务器上使用PySpark与Ipython一起使用24个CPU和32GB RAM.它只能在一台机器上运行.在我的过程中,我想收集大量数据,如下面的代码所示:

train_dataRDD = (train.map(lambda x:getTagsAndText(x))
.filter(lambda x:x[-1]!=[])
.flatMap(lambda (x,text,tags): [(tag,(x,text)) for tag in tags])
.groupByKey()
.mapValues(list))
Run Code Online (Sandbox Code Playgroud)

当我做

training_data =  train_dataRDD.collectAsMap()
Run Code Online (Sandbox Code Playgroud)

它给了我outOfMemory错误.Java heap Space.此外,我在此错误后无法对Spark执行任何操作,因为它失去了与Java的连接.它给出了Py4JNetworkError: Cannot connect to the java server.

看起来堆空间很小.如何将其设置为更大的限制?

编辑:

我在跑步之前尝试过的事情: sc._conf.set('spark.executor.memory','32g').set('spark.driver.memory','32g').set('spark.driver.maxResultsSize','0')

我按照此处的文档更改了spark选项(如果你执行ctrl-f并搜索spark.executor.extraJavaOptions):http://spark.apache.org/docs/1.2.1/configuration.html

它说我可以通过设置spark.executor.memory选项来避免OOM.我做了同样的事情,但似乎没有工作.

java heap-memory out-of-memory apache-spark pyspark

34
推荐指数
2
解决办法
4万
查看次数

Spark RDD到DataFrame python

我试图将Spark RDD转换为DataFrame.我已经看到了将方案传递给sqlContext.CreateDataFrame(rdd,schema)函数的文档和示例 .

但我有38列或字段,这将进一步增加.如果我手动给出指定每个字段信息的模式,那将会是如此繁琐的工作.

有没有其他方法可以在不知道先前列的信息的情况下指定模式.

python apache-spark pyspark spark-dataframe

34
推荐指数
1
解决办法
8万
查看次数

Spark使用上一行的值向数据框添加新列

我想知道如何在Spark(Pyspark)中实现以下功能

初始数据帧:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
Run Code Online (Sandbox Code Playgroud)

结果数据帧:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+
Run Code Online (Sandbox Code Playgroud)

我设法通过以下方式将新列"附加"到数据框中: df.withColumn("new_Col", df.num * 10)

但是我不知道如何为新列实现这种"行的移位",以便新列具有前一行的字段值(如示例所示).我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容.

任何帮助,将不胜感激.

python dataframe apache-spark apache-spark-sql pyspark

33
推荐指数
1
解决办法
2万
查看次数

如何在ipython中将Spark RDD转换为pandas数据帧?

我有一个RDD,我想将其转换为pandas dataframe.我知道要转换,我们可以做到RDD正常dataframe

df = rdd1.toDF()
Run Code Online (Sandbox Code Playgroud)

但我想转换RDDpandas dataframe而不是正常dataframe.我该怎么做?

python ipython pandas rdd pyspark

32
推荐指数
2
解决办法
7万
查看次数

Spark功能与UDF性能有关?

Spark现在提供可在数据帧中使用的预定义函数,并且它们似乎已经过高度优化.我最初的问题是更快,但我自己做了一些测试,发现至少在一个实例中,spark函数的速度提高了大约10倍.有谁知道为什么会这样,什么时候udf会更快(仅适用于存在相同spark函数的情况)?

这是我的测试代码(在Databricks社区上运行):

# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)

# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
  name = fake.name().split()
  return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)

# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
    for _ in xrange(times):
        yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print …
Run Code Online (Sandbox Code Playgroud)

performance user-defined-functions apache-spark apache-spark-sql pyspark

32
推荐指数
2
解决办法
2万
查看次数

如何将Vector拆分为列 - 使用PySpark

上下文:我有DataFrame2列:单词和向量.其中"vector"的列类型是VectorUDT.

一个例子:

word    |  vector
assert  | [435,323,324,212...]
Run Code Online (Sandbox Code Playgroud)

我希望得到这个:

word   |  v1 | v2  | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....
Run Code Online (Sandbox Code Playgroud)

题:

如何使用PySpark为每个维度拆分包含多列向量的列?

提前致谢

python apache-spark apache-spark-sql pyspark apache-spark-ml

32
推荐指数
1
解决办法
2万
查看次数