相关疑难解决方法(0)

单个撇号在Scala中意味着什么?

ScalaActors.pdf上的这个幻灯片放映中,单个引号指示何时将消息发送给pong actor?

class Ping(count: int, pong: Pong) extends Actor {
def act() {
   pong ! 'Ping // what does the single quote indicate???
      receive {
         case 'Pong =>
      }
   }
}
Run Code Online (Sandbox Code Playgroud)

scala actor

60
推荐指数
2
解决办法
1万
查看次数

使用Spark 2.0.2读取来自Kafka的Avro消息(结构化流式传输)

我有一个spark 2.0应用程序,它使用spark streaming(使用spark-streaming-kafka-0-10_2.11)从kafka读取消息.

结构化流看起来很酷,所以我想尝试迁移代码,但我无法弄清楚如何使用它.

在常规流媒体中,我使用kafkaUtils来创建Dstrean,在我传递的参数中是值deserializer.

在结构化流媒体中,doc说我应该使用DataFrame函数进行反序列化,但我无法确切地知道这意味着什么.

我查看了这个示例,例如我在Kafka中的Avro对象是退出复杂的,不能简单地像示例中的String一样进行转换.

到目前为止,我尝试了这种代码(我在这里看到了另一个问题):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()
Run Code Online (Sandbox Code Playgroud)

我得到"数据类型不匹配:无法将BinaryType转换为StructType(StructField(...."

我怎样才能反序化值?

scala avro apache-kafka spark-streaming apache-spark-2.0

8
推荐指数
1
解决办法
7297
查看次数

Pyspark 2.4.0,使用读取流从kafka读取avro-Python

我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。

spark-avro外部模块可以为读取avro文件提供以下解决方案:

df = spark.read.format("avro").load("examples/src/main/resources/users.avro") 
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Run Code Online (Sandbox Code Playgroud)

但是,我需要阅读流式Avro消息。库文档建议使用from_avro()函数,该函数仅适用于Scala和Java。

是否有其他模块支持读取从Kafka流式传输的Avro消息?

python avro apache-kafka apache-spark pyspark

4
推荐指数
1
解决办法
1393
查看次数

如何将基本身份验证传递给 Confluent Schema Registry?

我想从融合的云主题读取数据,然后写入另一个主题。

在本地主机上,我没有遇到任何重大问题。但是confluent cloud的schema registry需要传递一些我不知道如何输入的身份验证数据:

basic.auth.credentials.source=USER_INFO

schema.registry.basic.auth.user.info=:

schema.registry.url= https://xxxxxxxxxx.confluent.cloudBlockquote

以下是当前代码:

import com.databricks.spark.avro.SchemaConverters
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession

object AvroConsumer {
  private val topic = "transactions"
  private val kafkaUrl = "http://localhost:9092"
  private val schemaRegistryUrl = "http://localhost:8081"

  private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
  private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)

  private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
  private var sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("ConfluentConsumer")
      .master("local[*]")
      .getOrCreate() …
Run Code Online (Sandbox Code Playgroud)

apache-spark databricks confluent-schema-registry spark-structured-streaming confluent-platform

4
推荐指数
1
解决办法
2827
查看次数

如何将 Confluent Schema Registry 与 from_avro 标准函数一起使用?

My Kafka 和 Schema Registry 基于 Confluent 社区平台 5.2.2,My Spark 有 2.4.4 版本。我开始使用 Spark REPL env:

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4
Run Code Online (Sandbox Code Playgroud)

并为 spark 会话设置 Kafka 源:

val brokerServers = "my_confluent_server:9092"
val topicName = "my_kafka_topic_name" 
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load()
Run Code Online (Sandbox Code Playgroud)

我得到了关于键和值的模式信息:

import io.confluent.kafka.schemaregistry.client.rest.RestService
val schemaRegistryURL = "http://my_confluent_server:8081"
val restService = new RestService(schemaRegistryURL)
val keyRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-key")
val valueRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-value")
Run Code Online (Sandbox Code Playgroud)

首先,如果我用 writeStream 查询“ key ”,即

import org.apache.spark.sql.avro._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame …
Run Code Online (Sandbox Code Playgroud)

avro apache-kafka apache-spark confluent-schema-registry spark-structured-streaming

2
推荐指数
1
解决办法
2294
查看次数