相关疑难解决方法(0)

从任务中调用Java/Scala函数

背景

我原来的问题是为什么使用DecisionTreeModel.predict内部地图功能会引发异常?并且与如何使用MLlib在Spark上生成(原始标签,预测标签)的元组有关?

当我们使用Scala API时,推荐RDD[LabeledPoint]使用预测的方法DecisionTreeModel是简单地映射RDD:

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
Run Code Online (Sandbox Code Playgroud)

遗憾的是,PySpark中的类似方法效果不佳:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()
Run Code Online (Sandbox Code Playgroud)

例外:您似乎尝试从广播变量,操作或转换引用SparkContext.SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用.有关更多信息,请参阅SPARK-5063.

而不是官方文档推荐这样的东西:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
Run Code Online (Sandbox Code Playgroud)

那么这里发生了什么?此处没有广播变量,Scala API定义predict如下:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing …
Run Code Online (Sandbox Code Playgroud)

python scala apache-spark pyspark apache-spark-mllib

37
推荐指数
1
解决办法
9913
查看次数

SPARK SQL替换mysql GROUP_CONCAT聚合函数

我有一个包含两个字符串类型列(用户名,朋友)的表,对于每个用户名,我想在一行中收集所有朋友,连接为字符串('username1','friends1,friends2,friends3').我知道MySql通过GROUP_CONCAT做到这一点,有没有办法用SPARK SQL做到这一点?

谢谢

aggregate-functions apache-spark apache-spark-sql

30
推荐指数
4
解决办法
3万
查看次数

在PySpark中的GroupedData上应用UDF(具有正常运行的python示例)

我有这个在pandas数据帧中本地运行的python代码:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))
Run Code Online (Sandbox Code Playgroud)

我想在PySpark中运行它,但在处理pyspark.sql.group.GroupedData对象时遇到问题.

我尝试过以下方法:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 
Run Code Online (Sandbox Code Playgroud)

返回

KeyError: 'A'
Run Code Online (Sandbox Code Playgroud)

我推测因为'A'不再是一列而我找不到x.name的等价物.

然后

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()
Run Code Online (Sandbox Code Playgroud)

但是得到以下错误:

AttributeError: 'GroupedData' object has no attribute 'map'
Run Code Online (Sandbox Code Playgroud)

任何建议将非常感谢!

python user-defined-functions apache-spark apache-spark-sql pyspark

27
推荐指数
4
解决办法
2万
查看次数

如何在Pyspark中使用Scala类

如果有任何方法可以使用Scala课程Pyspark,我一直在寻找一段时间,而且我没有找到任何关于这个主题的文档或指南.

假设我创建了一个简单的类,Scala它使用了一些库apache-spark,例如:

class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
  def exe(): DataFrame = {
    import sqlContext.implicits._

    df.select(col(column))
  }
}
Run Code Online (Sandbox Code Playgroud)
  • 有没有可能的方法来使用这个类Pyspark
  • 太难了吗?
  • 我必须创建一个.py文件吗?
  • 是否有任何指南说明如何做到这一点?

顺便说一句,我也查看了spark代码,感觉有点迷失,我无法为自己的目的复制它们的功能.

python scala apache-spark apache-spark-sql pyspark

19
推荐指数
2
解决办法
8355
查看次数

Python API中是否提供Spark SQL UDAF(用户定义的聚合函数)?

从Spark 1.5.0开始,似乎可以编写自己的UDAF用于DataFrames上的自定义聚合: Spark 1.5 DataFrame API要点:日期/时间/字符串处理,时间间隔和UDAF

但是,我不清楚Python API是否支持此功能?

apache-spark apache-spark-sql spark-dataframe

13
推荐指数
1
解决办法
5532
查看次数

在pyspark中包装java函数

我正在尝试创建一个用户定义的聚合函数,我可以从python中调用它.我试着按照这个问题的答案.我基本上实现了以下(取自这里):

package com.blu.bla;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.Row;

public class MySum extends UserDefinedAggregateFunction {
    private StructType _inputDataType;
    private StructType _bufferSchema;
    private DataType _returnDataType;

    public MySum() {
        List<StructField> inputFields = new ArrayList<StructField>();
        inputFields.add(DataTypes.createStructField("inputDouble", DataTypes.DoubleType, true));
        _inputDataType = DataTypes.createStructType(inputFields);

        List<StructField> bufferFields = new ArrayList<StructField>();
        bufferFields.add(DataTypes.createStructField("bufferDouble", DataTypes.DoubleType, true));
        _bufferSchema = DataTypes.createStructType(bufferFields);

        _returnDataType = DataTypes.DoubleType;
    }

    @Override public StructType inputSchema() {
        return _inputDataType;
    }

    @Override public StructType …
Run Code Online (Sandbox Code Playgroud)

python java apache-spark pyspark

6
推荐指数
1
解决办法
5794
查看次数

将自定义函数应用于spark数据帧组

我有一个非常大的时间序列数据表,其中包含以下列:

  • 时间戳
  • LicensePlate
  • UberRide#
  • 速度

应考虑整组数据处理每个LicensePlate/UberRide数据集合.换句话说,我不需要逐行处理数据,而是将所有行按(LicensePlate/UberRide)一起分组.

我计划在数据帧api中使用spark,但我对如何在spark分组数据帧上执行自定义计算感到困惑.

我需要做的是:

  1. 获取所有数据
  2. 按一些列分组
  3. Foreach spark数据帧组应用af(x).返回自定义对象foreach组
  4. 通过应用g(x)并返回单个自定义对象来获取结果

我该怎么做第3步和第4步?我应该使用哪些关于spark API(数据帧,数据集,rdd,也许是pandas ......)的提示?

整个工作流程如下:

工作流程

group-by dataset dataframe apache-spark pyspark

6
推荐指数
2
解决办法
1万
查看次数

PySpark数据帧上的自定义聚合

我有一个PySpark DataFrame,其中一列作为一个热编码向量.我想在groupby之后通过向量加法来聚合不同的一个热编码向量

例如 df[userid,action] Row1: ["1234","[1,0,0]] Row2: ["1234", [0 1 0]]

我希望输出为行:["1234", [ 1 1 0]]因此向量是所有向量分组的总和userid.

我怎样才能做到这一点?PySpark sum聚合操作不支持向量加法.

aggregate-functions user-defined-functions apache-spark apache-spark-sql pyspark

6
推荐指数
1
解决办法
6030
查看次数

如何在PySpark中使用Scala UDF?

我希望能够将Scala函数用作PySpark中的UDF

package com.test

object ScalaPySparkUDFs extends Serializable {
    def testFunction1(x: Int): Int = { x * 2 }
    def testUDFFunction1 = udf { x: Int => testFunction1(x) }
} 
Run Code Online (Sandbox Code Playgroud)

我可以testFunction1在PySpark 中访问它并返回值:

functions = sc._jvm.com.test.ScalaPySparkUDFs 
functions.testFunction1(10)
Run Code Online (Sandbox Code Playgroud)

我想要做的就是将此函数用作UDF,最好是在withColumn通话中使用:

row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))
Run Code Online (Sandbox Code Playgroud)

我认为这里有一个很有前途的方法: Spark:如何用Scala或Java用户定义函数映射Python?

但是,在对其中的代码进行更改时,可以改为使用testUDFFunction1

def udf_test(col):
    sc = SparkContext._active_spark_context
    _f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
    return Column(_f(_to_seq(sc, [col], _to_java_column)))
Run Code Online (Sandbox Code Playgroud)

我得到:

 AttributeError: 'JavaMember' object has no attribute 'apply' 
Run Code Online (Sandbox Code Playgroud)

我不明白这是因为我相信testUDFFunction1有申请方法吗?

我不想使用在这里找到的类型的表达式:将 UDF从Scala注册到SqlContext,以便在PySpark中使用

任何有关如何使这项工作的建议,将不胜感激!

python scala apache-spark apache-spark-sql pyspark

6
推荐指数
2
解决办法
3331
查看次数

从Scala注册UDF到SqlContext以在PySpark中使用

是否可以注册用Scala编写的UDF(或函数)在PySpark中使用?例如:

val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2
Run Code Online (Sandbox Code Playgroud)

在Scala中,现在可以使用以下内容:

val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3
Run Code Online (Sandbox Code Playgroud)

我想在PySpark中使用"UDFaddOne"

%pyspark

mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work
Run Code Online (Sandbox Code Playgroud)

背景:我们是一个开发人员团队,一些用Scala编写,一些用Python编写,并且想分享已编写的函数.也可以将其保存到库中并导入它.

scala user-defined-functions apache-spark pyspark apache-zeppelin

5
推荐指数
2
解决办法
2972
查看次数

使用Scala转换PySpark RDD

TL; DR - 我在PySpark应用程序中看起来像字符串的DStream.我想将它作为DStream[String]一个Scala库发送.但是,字符串不会被Py4j转换.

我正在开发一个PySpark应用程序,它使用Spark Streaming从Kafka中提取数据.我的消息是字符串,我想在Scala代码中调用一个方法,并传递一个DStream[String]实例.但是,我无法在Scala代码中接收正确的JVM字符串.在我看来,Python字符串不会转换为Java字符串,而是序列化.

我的问题是:如何从DStream对象中获取Java字符串?


这是我提出的最简单的Python代码:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()
Run Code Online (Sandbox Code Playgroud)

我在PySpark中运行此代码,将其路径传递给我的JAR:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
Run Code Online (Sandbox Code Playgroud)

在Scala方面,我有:

package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
  def doSomething(jdstream: JavaDStream[String]) = {
    val dstream = jdstream.dstream
    dstream.foreachRDD(rdd => {
      rdd.foreach(println)
    })
  }
}
Run Code Online (Sandbox Code Playgroud)

现在,假设我将一些数据发送到Kafka:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
Run Code Online (Sandbox Code Playgroud)

printlnScala代码中的语句打印出如下内容:

[B@758aa4d9 …
Run Code Online (Sandbox Code Playgroud)

apache-spark rdd pyspark

5
推荐指数
1
解决办法
1529
查看次数

用户定义的功能要应用于PySpark中的Window?

我试图将用户定义的函数应用于PySpark中的Window。我已经读过UDAF也许是要走的路,但是我找不到任何具体的东西。

举个例子(摘自:Xinh的技术博客,并针对PySpark进行了修改):

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()

a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])

customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
                                    ["Alice", "2016-05-03", 45.00],
                                    ["Alice", "2016-05-04", 55.00],
                                    ["Bob", "2016-05-01", 25.00],
                                    ["Bob", "2016-05-04", 29.00],
                                    ["Bob", "2016-05-06", 27.00]],
                               ["name", "date", "amountSpent"])

customers.show()

window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)

result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))

result.show()
Run Code Online (Sandbox Code Playgroud)

我正在应用avg已经内置的函数,pyspark.sql.functions但是如果avg我不想使用更复杂的函数并编写自己的函数,该怎么办?

aggregate-functions user-defined-functions window-functions apache-spark pyspark

5
推荐指数
2
解决办法
4006
查看次数

PySpark 2.2 爆炸删除空行(如何实现explode_outer)?

我正在 PySpark 数据框中处理一些深度嵌套的数据。当我尝试将结构展平为行和列时,我注意到当我调用withColumn该行是否包含null在源列中时,该行将从我的结果数据框中删除。相反,我想找到一种方法来保留该行并null在结果列中包含该行。

要使用的示例数据框:

from pyspark.sql.functions import explode, first, col, monotonically_increasing_id
from pyspark.sql import Row

df = spark.createDataFrame([
  Row(dataCells=[Row(posx=0, posy=1, posz=.5, value=1.5, shape=[Row(_type='square', _len=1)]), 
                 Row(posx=1, posy=3, posz=.5, value=4.5, shape=[]), 
                 Row(posx=2, posy=5, posz=.5, value=7.5, shape=[Row(_type='circle', _len=.5)])
    ])
])
Run Code Online (Sandbox Code Playgroud)

我还有一个用于扁平结构的函数:

def flatten_struct_cols(df):
    flat_cols = [column[0] for column in df.dtypes if 'struct' not in column[1][:6]]
    struct_columns = [column[0] for column in df.dtypes if 'struct' in column[1][:6]]

    df = df.select(flat_cols +
                   [col(sc + '.' + c).alias(sc + '_' …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark

2
推荐指数
1
解决办法
2799
查看次数