在ScalaActors.pdf上的这个幻灯片放映中,单个引号指示何时将消息发送给pong actor?
class Ping(count: int, pong: Pong) extends Actor {
def act() {
pong ! 'Ping // what does the single quote indicate???
receive {
case 'Pong =>
}
}
}
Run Code Online (Sandbox Code Playgroud) 我有一个spark 2.0应用程序,它使用spark streaming(使用spark-streaming-kafka-0-10_2.11)从kafka读取消息.
结构化流看起来很酷,所以我想尝试迁移代码,但我无法弄清楚如何使用它.
在常规流媒体中,我使用kafkaUtils来创建Dstrean,在我传递的参数中是值deserializer.
在结构化流媒体中,doc说我应该使用DataFrame函数进行反序列化,但我无法确切地知道这意味着什么.
我查看了这个示例,例如我在Kafka中的Avro对象是退出复杂的,不能简单地像示例中的String一样进行转换.
到目前为止,我尝试了这种代码(我在这里看到了另一个问题):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
Run Code Online (Sandbox Code Playgroud)
我得到"数据类型不匹配:无法将BinaryType转换为StructType(StructField(...."
我怎样才能反序化值?
我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。
spark-avro外部模块可以为读取avro文件提供以下解决方案:
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Run Code Online (Sandbox Code Playgroud)
但是,我需要阅读流式Avro消息。库文档建议使用from_avro()函数,该函数仅适用于Scala和Java。
是否有其他模块支持读取从Kafka流式传输的Avro消息?
我想从融合的云主题读取数据,然后写入另一个主题。
在本地主机上,我没有遇到任何重大问题。但是confluent cloud的schema registry需要传递一些我不知道如何输入的身份验证数据:
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=:
schema.registry.url= https://xxxxxxxxxx.confluent.cloudBlockquote
以下是当前代码:
import com.databricks.spark.avro.SchemaConverters
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession
object AvroConsumer {
private val topic = "transactions"
private val kafkaUrl = "http://localhost:9092"
private val schemaRegistryUrl = "http://localhost:8081"
private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
private var sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("ConfluentConsumer")
.master("local[*]")
.getOrCreate() …Run Code Online (Sandbox Code Playgroud) apache-spark databricks confluent-schema-registry spark-structured-streaming confluent-platform
My Kafka 和 Schema Registry 基于 Confluent 社区平台 5.2.2,My Spark 有 2.4.4 版本。我开始使用 Spark REPL env:
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4
Run Code Online (Sandbox Code Playgroud)
并为 spark 会话设置 Kafka 源:
val brokerServers = "my_confluent_server:9092"
val topicName = "my_kafka_topic_name"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerServers)
.option("subscribe", topicName)
.load()
Run Code Online (Sandbox Code Playgroud)
我得到了关于键和值的模式信息:
import io.confluent.kafka.schemaregistry.client.rest.RestService
val schemaRegistryURL = "http://my_confluent_server:8081"
val restService = new RestService(schemaRegistryURL)
val keyRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-key")
val valueRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-value")
Run Code Online (Sandbox Code Playgroud)
首先,如果我用 writeStream 查询“ key ”,即
import org.apache.spark.sql.avro._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame …Run Code Online (Sandbox Code Playgroud) avro apache-kafka apache-spark confluent-schema-registry spark-structured-streaming