使用Scala转换PySpark RDD

Ale*_*rin 5 apache-spark rdd pyspark

TL; DR - 我在PySpark应用程序中看起来像字符串的DStream.我想将它作为DStream[String]一个Scala库发送.但是,字符串不会被Py4j转换.

我正在开发一个PySpark应用程序,它使用Spark Streaming从Kafka中提取数据.我的消息是字符串,我想在Scala代码中调用一个方法,并传递一个DStream[String]实例.但是,我无法在Scala代码中接收正确的JVM字符串.在我看来,Python字符串不会转换为Java字符串,而是序列化.

我的问题是:如何从DStream对象中获取Java字符串?


这是我提出的最简单的Python代码:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()
Run Code Online (Sandbox Code Playgroud)

我在PySpark中运行此代码,将其路径传递给我的JAR:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
Run Code Online (Sandbox Code Playgroud)

在Scala方面,我有:

package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
  def doSomething(jdstream: JavaDStream[String]) = {
    val dstream = jdstream.dstream
    dstream.foreachRDD(rdd => {
      rdd.foreach(println)
    })
  }
}
Run Code Online (Sandbox Code Playgroud)

现在,假设我将一些数据发送到Kafka:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
Run Code Online (Sandbox Code Playgroud)

printlnScala代码中的语句打印出如下内容:

[B@758aa4d9
Run Code Online (Sandbox Code Playgroud)

我期望得到foo bar.

现在,如果我用println以下代码替换Scala代码中的simple 语句:

rdd.foreach(v => println(v.getClass.getCanonicalName))
Run Code Online (Sandbox Code Playgroud)

我明白了:

java.lang.ClassCastException: [B cannot be cast to java.lang.String
Run Code Online (Sandbox Code Playgroud)

这表明字符串实际上是作为字节数组传递的.

如果我只是尝试将这个字节数组转换为字符串(我知道我甚至没有指定编码):

      def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
        val dstream = jdstream.dstream
        dstream.foreachRDD(rdd => {
          rdd.foreach(bytes => println(new String(bytes)))
        })
      }
Run Code Online (Sandbox Code Playgroud)

我得到的东西看起来像(特殊字符可能会被剥离):

?]qXfoo barqa.
Run Code Online (Sandbox Code Playgroud)

这表明Python字符串是序列化的(腌制?).我怎样才能找回一个合适的Java字符串呢?

zer*_*323 7

长话短说没有支持的方式来做这样的事情.不要在生产中尝试这个.你被警告过了.

一般情况下,Spark不会将Py4j用于驱动程序上的某些基本RPC调用,也不会在任何其他计算机上启动Py4j网关.当需要它(主要是MLlib和SQL的某些部分)时,Spark使用Pyrolite来序列化在JVM和Python之间传递的对象.

API的这一部分是私有(Scala)或内部(Python),因此不适用于一般用途.理论上,无论如何,您每批都可以访问它:

package dummy

import org.apache.spark.api.java.JavaRDD
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.sql.DataFrame

object PythonRDDHelper {
  def go(rdd: JavaRDD[Any]) = {
    rdd.rdd.collect {
      case s: String => s
    }.take(5).foreach(println)
  }
}
Run Code Online (Sandbox Code Playgroud)

完整的流:

object PythonDStreamHelper {
  def go(stream: JavaDStream[Any]) = {
    stream.dstream.transform(_.collect {
      case s: String => s
    }).print
  }
}
Run Code Online (Sandbox Code Playgroud)

或将个别批次暴露为DataFrames(可能是最不邪恶的选择):

object PythonDataFrameHelper {
  def go(df: DataFrame) = {
    df.show
  }
}
Run Code Online (Sandbox Code Playgroud)

并使用这些包装如下:

from pyspark.streaming import StreamingContext
from pyspark.mllib.common import _to_java_object_rdd
from pyspark.rdd import RDD

ssc = StreamingContext(spark.sparkContext, 10)
spark.catalog.listTables()

q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)]) 

# Reserialize RDD as Java RDD<Object> and pass 
# to Scala sink (only for output)
q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
    _to_java_object_rdd(rdd)
))

# Reserialize and convert to JavaDStream<Object>
# This is the only option which allows further transformations
# on DStream
ssc._jvm.dummy.PythonDStreamHelper.go(
    q.transform(lambda rdd: RDD(  # Reserialize but keep as Python RDD
        _to_java_object_rdd(rdd), ssc.sparkContext
    ))._jdstream
)

# Convert to DataFrame and pass to Scala sink.
# Arguably there are relatively few moving parts here. 
q.foreachRDD(lambda rdd: 
    ssc._jvm.dummy.PythonDataFrameHelper.go(
        rdd.map(lambda x: (x, )).toDF()._jdf
    )
)

ssc.start()
ssc.awaitTerminationOrTimeout(30)
ssc.stop()
Run Code Online (Sandbox Code Playgroud)

这是不受支持的,未经测试的,因此除了使用Spark API进行实验之外的任何其他内容都没用.