我原来的问题是为什么使用DecisionTreeModel.predict内部地图功能会引发异常?并且与如何使用MLlib在Spark上生成(原始标签,预测标签)的元组有关?
当我们使用Scala API时,推荐RDD[LabeledPoint]使用预测的方法DecisionTreeModel是简单地映射RDD:
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
Run Code Online (Sandbox Code Playgroud)
遗憾的是,PySpark中的类似方法效果不佳:
labelsAndPredictions = testData.map(
lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
Run Code Online (Sandbox Code Playgroud)
例外:您似乎尝试从广播变量,操作或转换引用SparkContext.SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063.
而不是官方文档推荐这样的东西:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
Run Code Online (Sandbox Code Playgroud)
那么这里发生了什么?此处没有广播变量,Scala API定义predict如下:
/**
* Predict values for a single data point using the model trained.
*
* @param features array representing …Run Code Online (Sandbox Code Playgroud) 我有一个包含两个字符串类型列(用户名,朋友)的表,对于每个用户名,我想在一行中收集所有朋友,连接为字符串('username1','friends1,friends2,friends3').我知道MySql通过GROUP_CONCAT做到这一点,有没有办法用SPARK SQL做到这一点?
谢谢
我有这个在pandas数据帧中本地运行的python代码:
df_result = pd.DataFrame(df
.groupby('A')
.apply(lambda x: myFunction(zip(x.B, x.C), x.name))
Run Code Online (Sandbox Code Playgroud)
我想在PySpark中运行它,但在处理pyspark.sql.group.GroupedData对象时遇到问题.
我尝试过以下方法:
sparkDF
.groupby('A')
.agg(myFunction(zip('B', 'C'), 'A'))
Run Code Online (Sandbox Code Playgroud)
返回
KeyError: 'A'
Run Code Online (Sandbox Code Playgroud)
我推测因为'A'不再是一列而我找不到x.name的等价物.
然后
sparkDF
.groupby('A')
.map(lambda row: Row(myFunction(zip('B', 'C'), 'A')))
.toDF()
Run Code Online (Sandbox Code Playgroud)
但是得到以下错误:
AttributeError: 'GroupedData' object has no attribute 'map'
Run Code Online (Sandbox Code Playgroud)
任何建议将非常感谢!
python user-defined-functions apache-spark apache-spark-sql pyspark
如果有任何方法可以使用Scala课程Pyspark,我一直在寻找一段时间,而且我没有找到任何关于这个主题的文档或指南.
假设我创建了一个简单的类,Scala它使用了一些库apache-spark,例如:
class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
def exe(): DataFrame = {
import sqlContext.implicits._
df.select(col(column))
}
}
Run Code Online (Sandbox Code Playgroud)
Pyspark?.py文件吗? 顺便说一句,我也查看了spark代码,感觉有点迷失,我无法为自己的目的复制它们的功能.
从Spark 1.5.0开始,似乎可以编写自己的UDAF用于DataFrames上的自定义聚合: Spark 1.5 DataFrame API要点:日期/时间/字符串处理,时间间隔和UDAF
但是,我不清楚Python API是否支持此功能?
我正在尝试创建一个用户定义的聚合函数,我可以从python中调用它.我试着按照这个问题的答案.我基本上实现了以下(取自这里):
package com.blu.bla;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.Row;
public class MySum extends UserDefinedAggregateFunction {
private StructType _inputDataType;
private StructType _bufferSchema;
private DataType _returnDataType;
public MySum() {
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputDouble", DataTypes.DoubleType, true));
_inputDataType = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("bufferDouble", DataTypes.DoubleType, true));
_bufferSchema = DataTypes.createStructType(bufferFields);
_returnDataType = DataTypes.DoubleType;
}
@Override public StructType inputSchema() {
return _inputDataType;
}
@Override public StructType …Run Code Online (Sandbox Code Playgroud) 我有一个非常大的时间序列数据表,其中包含以下列:
应考虑整组数据处理每个LicensePlate/UberRide数据集合.换句话说,我不需要逐行处理数据,而是将所有行按(LicensePlate/UberRide)一起分组.
我计划在数据帧api中使用spark,但我对如何在spark分组数据帧上执行自定义计算感到困惑.
我需要做的是:
我该怎么做第3步和第4步?我应该使用哪些关于spark API(数据帧,数据集,rdd,也许是pandas ......)的提示?
整个工作流程如下:
我有一个PySpark DataFrame,其中一列作为一个热编码向量.我想在groupby之后通过向量加法来聚合不同的一个热编码向量
例如 df[userid,action] Row1: ["1234","[1,0,0]] Row2: ["1234", [0 1 0]]
我希望输出为行:["1234", [ 1 1 0]]因此向量是所有向量分组的总和userid.
我怎样才能做到这一点?PySpark sum聚合操作不支持向量加法.
aggregate-functions user-defined-functions apache-spark apache-spark-sql pyspark
我希望能够将Scala函数用作PySpark中的UDF
package com.test
object ScalaPySparkUDFs extends Serializable {
def testFunction1(x: Int): Int = { x * 2 }
def testUDFFunction1 = udf { x: Int => testFunction1(x) }
}
Run Code Online (Sandbox Code Playgroud)
我可以testFunction1在PySpark 中访问它并返回值:
functions = sc._jvm.com.test.ScalaPySparkUDFs
functions.testFunction1(10)
Run Code Online (Sandbox Code Playgroud)
我想要做的就是将此函数用作UDF,最好是在withColumn通话中使用:
row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))
Run Code Online (Sandbox Code Playgroud)
我认为这里有一个很有前途的方法: Spark:如何用Scala或Java用户定义函数映射Python?
但是,在对其中的代码进行更改时,可以改为使用testUDFFunction1:
def udf_test(col):
sc = SparkContext._active_spark_context
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
return Column(_f(_to_seq(sc, [col], _to_java_column)))
Run Code Online (Sandbox Code Playgroud)
我得到:
AttributeError: 'JavaMember' object has no attribute 'apply'
Run Code Online (Sandbox Code Playgroud)
我不明白这是因为我相信testUDFFunction1有申请方法吗?
我不想使用在这里找到的类型的表达式:将 UDF从Scala注册到SqlContext,以便在PySpark中使用
任何有关如何使这项工作的建议,将不胜感激!
是否可以注册用Scala编写的UDF(或函数)在PySpark中使用?例如:
val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2
Run Code Online (Sandbox Code Playgroud)
在Scala中,现在可以使用以下内容:
val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3
Run Code Online (Sandbox Code Playgroud)
我想在PySpark中使用"UDFaddOne"
%pyspark
mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work
Run Code Online (Sandbox Code Playgroud)
背景:我们是一个开发人员团队,一些用Scala编写,一些用Python编写,并且想分享已编写的函数.也可以将其保存到库中并导入它.
scala user-defined-functions apache-spark pyspark apache-zeppelin
TL; DR - 我在PySpark应用程序中看起来像字符串的DStream.我想将它作为DStream[String]一个Scala库发送.但是,字符串不会被Py4j转换.
我正在开发一个PySpark应用程序,它使用Spark Streaming从Kafka中提取数据.我的消息是字符串,我想在Scala代码中调用一个方法,并传递一个DStream[String]实例.但是,我无法在Scala代码中接收正确的JVM字符串.在我看来,Python字符串不会转换为Java字符串,而是序列化.
我的问题是:如何从DStream对象中获取Java字符串?
这是我提出的最简单的Python代码:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])
ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
ssc.start()
Run Code Online (Sandbox Code Playgroud)
我在PySpark中运行此代码,将其路径传递给我的JAR:
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
Run Code Online (Sandbox Code Playgroud)
在Scala方面,我有:
package com.seigneurin
import org.apache.spark.streaming.api.java.JavaDStream
object MyPythonHelper {
def doSomething(jdstream: JavaDStream[String]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(println)
})
}
}
Run Code Online (Sandbox Code Playgroud)
现在,假设我将一些数据发送到Kafka:
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
Run Code Online (Sandbox Code Playgroud)
printlnScala代码中的语句打印出如下内容:
[B@758aa4d9 …Run Code Online (Sandbox Code Playgroud) 我试图将用户定义的函数应用于PySpark中的Window。我已经读过UDAF也许是要走的路,但是我找不到任何具体的东西。
举个例子(摘自:Xinh的技术博客,并针对PySpark进行了修改):
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()
a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])
customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
["Alice", "2016-05-03", 45.00],
["Alice", "2016-05-04", 55.00],
["Bob", "2016-05-01", 25.00],
["Bob", "2016-05-04", 29.00],
["Bob", "2016-05-06", 27.00]],
["name", "date", "amountSpent"])
customers.show()
window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))
result.show()
Run Code Online (Sandbox Code Playgroud)
我正在应用avg已经内置的函数,pyspark.sql.functions但是如果avg我不想使用更复杂的函数并编写自己的函数,该怎么办?
aggregate-functions user-defined-functions window-functions apache-spark pyspark
我正在 PySpark 数据框中处理一些深度嵌套的数据。当我尝试将结构展平为行和列时,我注意到当我调用withColumn该行是否包含null在源列中时,该行将从我的结果数据框中删除。相反,我想找到一种方法来保留该行并null在结果列中包含该行。
要使用的示例数据框:
from pyspark.sql.functions import explode, first, col, monotonically_increasing_id
from pyspark.sql import Row
df = spark.createDataFrame([
Row(dataCells=[Row(posx=0, posy=1, posz=.5, value=1.5, shape=[Row(_type='square', _len=1)]),
Row(posx=1, posy=3, posz=.5, value=4.5, shape=[]),
Row(posx=2, posy=5, posz=.5, value=7.5, shape=[Row(_type='circle', _len=.5)])
])
])
Run Code Online (Sandbox Code Playgroud)
我还有一个用于扁平结构的函数:
def flatten_struct_cols(df):
flat_cols = [column[0] for column in df.dtypes if 'struct' not in column[1][:6]]
struct_columns = [column[0] for column in df.dtypes if 'struct' in column[1][:6]]
df = df.select(flat_cols +
[col(sc + '.' + c).alias(sc + '_' …Run Code Online (Sandbox Code Playgroud)