标签: pyspark

如何遍历pyspark中的每一行dataFrame

例如

sqlContext = SQLContext(sc)

sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()
Run Code Online (Sandbox Code Playgroud)

上面的语句在终端上打印整个表,但我想使用for或while访问该表中的每一行以执行进一步的计算.

for-loop dataframe apache-spark apache-spark-sql pyspark

40
推荐指数
5
解决办法
9万
查看次数

pyspark collect_set或collect_list with groupby

我怎样才能在之后使用collect_setcollect_list使用数据帧groupby.例如:df.groupby('key').collect_set('values').我收到一个错误:AttributeError: 'GroupedData' object has no attribute 'collect_set'

group-by list set collect pyspark

40
推荐指数
1
解决办法
5万
查看次数

Pyspark:例外:在向驱动程序发送端口号之前退出Java网关进程

我正试图在macbook air上运行pyspark.当我尝试启动它时,我收到错误:

Exception: Java gateway process exited before sending the driver its port number
Run Code Online (Sandbox Code Playgroud)

当sc =启动时调用SparkContext().我试过运行以下命令:

./bin/pyspark
./bin/spark-shell
export PYSPARK_SUBMIT_ARGS="--master local[2] pyspark-shell"
Run Code Online (Sandbox Code Playgroud)

无济于事.我也看过这里:

Spark + Python - 在向驱动程序发送端口号之前退出Java网关进程?

但问题从未得到解答.请帮忙!谢谢.

python java macos apache-spark pyspark

38
推荐指数
10
解决办法
6万
查看次数

从任务中调用Java/Scala函数

背景

我原来的问题是为什么使用DecisionTreeModel.predict内部地图功能会引发异常?并且与如何使用MLlib在Spark上生成(原始标签,预测标签)的元组有关?

当我们使用Scala API时,推荐RDD[LabeledPoint]使用预测的方法DecisionTreeModel是简单地映射RDD:

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
Run Code Online (Sandbox Code Playgroud)

遗憾的是,PySpark中的类似方法效果不佳:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
Run Code Online (Sandbox Code Playgroud)

例外:您似乎尝试从广播变量,操作或转换引用SparkContext.SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063.

而不是官方文档推荐这样的东西:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
Run Code Online (Sandbox Code Playgroud)

那么这里发生了什么?此处没有广播变量,Scala API定义predict如下:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing …
Run Code Online (Sandbox Code Playgroud)

python scala apache-spark pyspark apache-spark-mllib

37
推荐指数
1
解决办法
9913
查看次数

使用类似SQL的IN子句过滤Pyspark DataFrame

我想用类似SQL的IN子句过滤Pyspark DataFrame ,如

sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')
Run Code Online (Sandbox Code Playgroud)

a元组在哪里(1, 2, 3).我收到此错误:

java.lang.RuntimeException:[1.67]失败:``('''',但是找到了标识符

这基本上是说它期待类似'(1,2,3)'而不是a.问题是我不能手动写入a中的值,因为它是从另一个作业中提取的.

在这种情况下我该如何过滤?

python sql dataframe apache-spark pyspark

37
推荐指数
4
解决办法
5万
查看次数

Pyspark错误-不支持的类文件主要版本55

固定:

为了解决这个问题,我编辑了bash_profile,以确保将java 1.8用作全局默认值,如下所示:

touch ~/.bash_profile; open ~/.bash_profile
Run Code Online (Sandbox Code Playgroud)

新增中

export JAVA_HOME=$(/usr/libexec/java_home -v 1.8) 
Run Code Online (Sandbox Code Playgroud)

并保存在文本编辑中。

更新

由于Oracle的许可证更改,上述修复程序可能无法工作,并且通过brew安装可能会遇到问题。为了安装Java 8,您可能需要遵循指南。


题:

我正在尝试在Mac上安装Spark。我用自制软件安装了spark 2.4.0和Scala。我已经在Anaconda环境中安装了PySpark,并且正在使用PyCharm进行开发。我已经导出到我的bash个人资料:

export SPARK_VERSION=`ls /usr/local/Cellar/apache-spark/ | sort | tail -1`
export SPARK_HOME="/usr/local/Cellar/apache-spark/$SPARK_VERSION/libexec"
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH
Run Code Online (Sandbox Code Playgroud)

但是我无法使其正常工作。

我怀疑这是由于Java版本读取了回溯。我非常感谢您为解决此问题提供的帮助。如果有任何我可以提供的信息,那么请发表评论,这些信息除了回溯之外还有帮助。

我收到以下错误:

Traceback (most recent call last):
  File "<input>", line 4, in <module>
  File "/anaconda3/envs/coda/lib/python3.6/site-packages/pyspark/rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/anaconda3/envs/coda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/anaconda3/envs/coda/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An …
Run Code Online (Sandbox Code Playgroud)

python java macos apache-spark pyspark

37
推荐指数
4
解决办法
3万
查看次数

连接两个PySpark数据帧

我正在尝试连接两个PySpark数据帧和一些只在每个上面的列:

from pyspark.sql.functions import randn, rand

df_1 = sqlContext.range(0, 10)

+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+

df_2 = sqlContext.range(11, 20)

+--+
|id|
+--+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+--+

df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2"))
Run Code Online (Sandbox Code Playgroud)

现在我想生成第三个数据帧.我想要像熊猫这样的东西concat:

df_1.show()
+---+--------------------+--------------------+
| id|             uniform| …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

36
推荐指数
7
解决办法
10万
查看次数

在pyspark中的每个DataFrame组中检索前n个

pyspark中有一个DataFrame,数据如下:

user_id object_id score
user_1  object_1  3
user_1  object_1  1
user_1  object_2  2
user_2  object_1  5
user_2  object_2  2
user_2  object_2  6
Run Code Online (Sandbox Code Playgroud)

我期望在每个组中返回具有相同user_id的2条记录,这些记录需要具有最高分.因此,结果应如下所示:

user_id object_id score
user_1  object_1  3
user_1  object_2  2
user_2  object_2  6
user_2  object_1  5
Run Code Online (Sandbox Code Playgroud)

我是pyspark的新手,有人能给我一个代码片段或门户网站来解决这个问题的相关文档吗?十分感谢!

python dataframe apache-spark apache-spark-sql pyspark

36
推荐指数
2
解决办法
4万
查看次数

如何将RDD拆分为两个或更多RDD?

我正在寻找一种方法将RDD分成两个或更多RDD.我见过的最接近的是Scala Spark:拆分收集到几个RDD?这仍然是一个RDD.

如果您熟悉SAS,请执行以下操作:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;
Run Code Online (Sandbox Code Playgroud)

这导致了两个不同的数据集.它必须立即坚持以获得我打算的结果......

apache-spark rdd pyspark

35
推荐指数
2
解决办法
4万
查看次数

如何融化Spark DataFrame?

PySpark中的Apache Spark中是否存在等效的Pandas Melt函数,或者至少在Scala中?

我到目前为止在python中运行了一个示例数据集,现在我想将Spark用于整个数据集.

提前致谢.

melt apache-spark apache-spark-sql pyspark

35
推荐指数
3
解决办法
1万
查看次数