Dee*_*arg 3 scala apache-kafka apache-spark kafka-producer-api
我需要帮助使用 kafka 生产者向主题发布消息。我的 kafka 生产者客户端是用运行在 spark 上的 Scala 编写的。
我的工作运行成功,但我的消息似乎没有发布。
这是代码
val response = info.producer.asInstanceOf[KafkaProducer[K, V]].send(new ProducerRecord(info.props.getProperty(s"$topicNickName.topic"), keyMessage._1, keyMessage._2))
Run Code Online (Sandbox Code Playgroud)
生产者配置值
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [x.data.edh:6667, y.data.edh:6667, z.data.edh:6667, a.data.edh:6667, b.data.edh:6667]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 60000
interceptor.classes = null
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 0
Run Code Online (Sandbox Code Playgroud)
我如何调试问题?
以下是如何在 Scala 中向 Kafka 生成消息的示例:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
val kafkaProducerProps: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "x.data.edh:6667")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
props
}
val producer = new KafkaProducer[String, String](kafkaProducerProps)
producer.send(new ProducerRecord[String, String]("myTopic", keyMessage._1, keyMessage._2))
Run Code Online (Sandbox Code Playgroud)
如果您想进行流式处理,我建议您查看Spark + Kafka 集成指南。
请注意,上面给出的示例是处于即发即忘模式的 KafkaProducer 。Kafka Producer的使用方式有以下几种:
Fire-and-forget 我们向服务器发送一条消息,并不真正关心它是否成功到达。大多数时候,它会成功到达,因为 Kafka 是高可用的,生产者会自动重试发送消息。但是,使用此方法会丢失一些消息。
同步发送 我们发送一条消息,该
send()方法返回一个 Future 对象,我们get()用来等待未来,看看是否send()成功。异步发送 我们
send()使用一个callback函数调用该方法,当它收到来自 Kafka 代理的响应时被触发
producer.send(record).get()
Run Code Online (Sandbox Code Playgroud)
producer.send(record, new compareProducerCallback)
producer.flush()
// Callback trait only contains the one abstract method onCompletion
private class compareProducerCallback extends Callback {
@Override
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) {
exception.printStackTrace()
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3758 次 |
| 最近记录: |