例如
sqlContext = SQLContext(sc)
sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()
Run Code Online (Sandbox Code Playgroud)
上面的语句在终端上打印整个表,但我想使用for或while访问该表中的每一行以执行进一步的计算.
我怎样才能在之后使用collect_set或collect_list使用数据帧groupby.例如:df.groupby('key').collect_set('values').我收到一个错误:AttributeError: 'GroupedData' object has no attribute 'collect_set'
我正试图在macbook air上运行pyspark.当我尝试启动它时,我收到错误:
Exception: Java gateway process exited before sending the driver its port number
Run Code Online (Sandbox Code Playgroud)
当sc =启动时调用SparkContext().我试过运行以下命令:
./bin/pyspark
./bin/spark-shell
export PYSPARK_SUBMIT_ARGS="--master local[2] pyspark-shell"
Run Code Online (Sandbox Code Playgroud)
无济于事.我也看过这里:
Spark + Python - 在向驱动程序发送端口号之前退出Java网关进程?
但问题从未得到解答.请帮忙!谢谢.
我原来的问题是为什么使用DecisionTreeModel.predict内部地图功能会引发异常?并且与如何使用MLlib在Spark上生成(原始标签,预测标签)的元组有关?
当我们使用Scala API时,推荐RDD[LabeledPoint]使用预测的方法DecisionTreeModel是简单地映射RDD:
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
Run Code Online (Sandbox Code Playgroud)
遗憾的是,PySpark中的类似方法效果不佳:
labelsAndPredictions = testData.map(
lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
Run Code Online (Sandbox Code Playgroud)
例外:您似乎尝试从广播变量,操作或转换引用SparkContext.SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063.
而不是官方文档推荐这样的东西:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
Run Code Online (Sandbox Code Playgroud)
那么这里发生了什么?此处没有广播变量,Scala API定义predict如下:
/**
* Predict values for a single data point using the model trained.
*
* @param features array representing …Run Code Online (Sandbox Code Playgroud) 我想用类似SQL的IN子句过滤Pyspark DataFrame ,如
sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')
Run Code Online (Sandbox Code Playgroud)
a元组在哪里(1, 2, 3).我收到此错误:
java.lang.RuntimeException:[1.67]失败:``('''',但是找到了标识符
这基本上是说它期待类似'(1,2,3)'而不是a.问题是我不能手动写入a中的值,因为它是从另一个作业中提取的.
在这种情况下我该如何过滤?
固定:
为了解决这个问题,我编辑了bash_profile,以确保将java 1.8用作全局默认值,如下所示:
touch ~/.bash_profile; open ~/.bash_profile
Run Code Online (Sandbox Code Playgroud)
新增中
export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
Run Code Online (Sandbox Code Playgroud)
并保存在文本编辑中。
更新
由于Oracle的许可证更改,上述修复程序可能无法工作,并且通过brew安装可能会遇到问题。为了安装Java 8,您可能需要遵循本指南。
题:
我正在尝试在Mac上安装Spark。我用自制软件安装了spark 2.4.0和Scala。我已经在Anaconda环境中安装了PySpark,并且正在使用PyCharm进行开发。我已经导出到我的bash个人资料:
export SPARK_VERSION=`ls /usr/local/Cellar/apache-spark/ | sort | tail -1`
export SPARK_HOME="/usr/local/Cellar/apache-spark/$SPARK_VERSION/libexec"
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH
Run Code Online (Sandbox Code Playgroud)
但是我无法使其正常工作。
我怀疑这是由于Java版本读取了回溯。我非常感谢您为解决此问题提供的帮助。如果有任何我可以提供的信息,那么请发表评论,这些信息除了回溯之外还有帮助。
我收到以下错误:
Traceback (most recent call last):
File "<input>", line 4, in <module>
File "/anaconda3/envs/coda/lib/python3.6/site-packages/pyspark/rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/anaconda3/envs/coda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/anaconda3/envs/coda/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An …Run Code Online (Sandbox Code Playgroud) 我正在尝试连接两个PySpark数据帧和一些只在每个上面的列:
from pyspark.sql.functions import randn, rand
df_1 = sqlContext.range(0, 10)
+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+
df_2 = sqlContext.range(11, 20)
+--+
|id|
+--+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+--+
df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2"))
Run Code Online (Sandbox Code Playgroud)
现在我想生成第三个数据帧.我想要像熊猫这样的东西concat:
df_1.show()
+---+--------------------+--------------------+
| id| uniform| …Run Code Online (Sandbox Code Playgroud) pyspark中有一个DataFrame,数据如下:
user_id object_id score
user_1 object_1 3
user_1 object_1 1
user_1 object_2 2
user_2 object_1 5
user_2 object_2 2
user_2 object_2 6
Run Code Online (Sandbox Code Playgroud)
我期望在每个组中返回具有相同user_id的2条记录,这些记录需要具有最高分.因此,结果应如下所示:
user_id object_id score
user_1 object_1 3
user_1 object_2 2
user_2 object_2 6
user_2 object_1 5
Run Code Online (Sandbox Code Playgroud)
我是pyspark的新手,有人能给我一个代码片段或门户网站来解决这个问题的相关文档吗?十分感谢!
我正在寻找一种方法将RDD分成两个或更多RDD.我见过的最接近的是Scala Spark:拆分收集到几个RDD?这仍然是一个RDD.
如果您熟悉SAS,请执行以下操作:
data work.split1, work.split2;
set work.preSplit;
if (condition1)
output work.split1
else if (condition2)
output work.split2
run;
Run Code Online (Sandbox Code Playgroud)
这导致了两个不同的数据集.它必须立即坚持以获得我打算的结果......
PySpark中的Apache Spark中是否存在等效的Pandas Melt函数,或者至少在Scala中?
我到目前为止在python中运行了一个示例数据集,现在我想将Spark用于整个数据集.
提前致谢.