小编xav*_*xav的帖子

如何将基本身份验证传递给 Confluent Schema Registry?

我想从融合的云主题读取数据,然后写入另一个主题。

在本地主机上,我没有遇到任何重大问题。但是confluent cloud的schema registry需要传递一些我不知道如何输入的身份验证数据:

basic.auth.credentials.source=USER_INFO

schema.registry.basic.auth.user.info=:

schema.registry.url= https://xxxxxxxxxx.confluent.cloudBlockquote

以下是当前代码:

import com.databricks.spark.avro.SchemaConverters
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession

object AvroConsumer {
  private val topic = "transactions"
  private val kafkaUrl = "http://localhost:9092"
  private val schemaRegistryUrl = "http://localhost:8081"

  private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
  private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)

  private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
  private var sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("ConfluentConsumer")
      .master("local[*]")
      .getOrCreate() …
Run Code Online (Sandbox Code Playgroud)

apache-spark databricks confluent-schema-registry spark-structured-streaming confluent-platform

4
推荐指数
1
解决办法
2827
查看次数

如何在apache nifi(jython)中使用executescript根据条件生成属性?

我正在使用 apache nifi 在 kafka 中存储日志。

我有一些日志行,这取决于我必须将它们发送到主题 kafka 或其他主题的内容。我的问题是有很多主题,因此我必须使用很多处理器。

我认为使用“executescript”,我可以根据日志文本中的条件生成一个动态属性,我可以在publishkafka 处理器的“主题名称”属性中使用该属性。

我从代码开始读取flowfile的内容,以及写一些条件,但不知道如何生成属性。

日志行的一些示例:Jim,18,M,156,Oregon,USA 等 John,55,M,170,Idaho,USA 等

这是我到目前为止所拥有的:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
    def __init__(self):
         pass
    def process(self, inputStream, outputStream):
         Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
         TextLog = str(Log).split(',')
         name = TextLog[0]
         age = TextLog[1]
         sex = TextLog[2]

         if name == 'John' and age == '30':
             Topic_A = str(TextLog) 
             outputStream.write(bytearray((Topic_A).encode('utf-8')))
         elif name == 'Max' and age == '25':
             Topic_B = str(TextLog)
             outputStream.write(bytearray((Topic_B).encode('utf-8')))
         elif name …
Run Code Online (Sandbox Code Playgroud)

apache-nifi

2
推荐指数
1
解决办法
452
查看次数