以 Camel-Quarkus 提供的官方示例为起点,我修改了逻辑,以便写入 Kafka 代理。随着 Camel Kafka 组件指向本地代理,一切都运行良好。
尝试联系我们的 Confluence Cloud 代理时,事情会变得稍微复杂一些。我们使用的安全协议是 SASL_SSL。以下代码片段会导致在该问题末尾添加日志。为了重现,请在此处找到完整的代码https://github.com/LeonardoBonacci/camel-kafka-sasl
final String brokers = "the-kafka-host.confluent.cloud:9092";
final String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD";
from("direct:start")
.setBody(exchange -> "I do not arrive")
.log(LoggingLevel.INFO, "Sending to Kafka: ${body}")
.to("kafka:foo-topic?"
+ "brokers=" + brokers
+ "&saslMechanism=PLAIN"
+ "&securityProtocol=SASL_SSL"
+ "&sslEndpointAlgorithm=HTTPS"
+ "&saslJaasConfig=" + saslJaasConfig);
Run Code Online (Sandbox Code Playgroud)
记录的ProducerConfig似乎是正确的。当我在普通 Kafka Producer 中使用相同的凭据时,除了将记录写入 Kafka 主题之外,它还会打印几乎相同的ProducerConfig。这表明配置值已很好地传播到底层生产者。
解释日志后发现 SSL 握手有效。下一步不太成功: SaslClientAuthenticator尝试失败。
从各种相互矛盾的博客文章和官方文档中,我无法推断出 SASL_SSL 是否实际上受到支持。
谁能帮我解决这个问题吗?非常感激!
2020-11-02 07:16:03,244 …
Run Code Online (Sandbox Code Playgroud)