我想从融合的云主题读取数据,然后写入另一个主题。
在本地主机上,我没有遇到任何重大问题。但是confluent cloud的schema registry需要传递一些我不知道如何输入的身份验证数据:
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=:
schema.registry.url= https://xxxxxxxxxx.confluent.cloudBlockquote
以下是当前代码:
import com.databricks.spark.avro.SchemaConverters
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession
object AvroConsumer {
private val topic = "transactions"
private val kafkaUrl = "http://localhost:9092"
private val schemaRegistryUrl = "http://localhost:8081"
private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
private var sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("ConfluentConsumer")
.master("local[*]")
.getOrCreate() …Run Code Online (Sandbox Code Playgroud) apache-spark databricks confluent-schema-registry spark-structured-streaming confluent-platform
我正在使用 apache nifi 在 kafka 中存储日志。
我有一些日志行,这取决于我必须将它们发送到主题 kafka 或其他主题的内容。我的问题是有很多主题,因此我必须使用很多处理器。
我认为使用“executescript”,我可以根据日志文本中的条件生成一个动态属性,我可以在publishkafka 处理器的“主题名称”属性中使用该属性。
我从代码开始读取flowfile的内容,以及写一些条件,但不知道如何生成属性。
日志行的一些示例:Jim,18,M,156,Oregon,USA 等 John,55,M,170,Idaho,USA 等
这是我到目前为止所拥有的:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
TextLog = str(Log).split(',')
name = TextLog[0]
age = TextLog[1]
sex = TextLog[2]
if name == 'John' and age == '30':
Topic_A = str(TextLog)
outputStream.write(bytearray((Topic_A).encode('utf-8')))
elif name == 'Max' and age == '25':
Topic_B = str(TextLog)
outputStream.write(bytearray((Topic_B).encode('utf-8')))
elif name …Run Code Online (Sandbox Code Playgroud)