小编vin*_*ana的帖子

如何在java中使用sasl机制PLAIN和安全协议SASL_SSL配置kafka消费者?

我想创建使用安全协议 SASL_SSL 和 sasl Merchanism PLAIN 的 kafka 消费者。有人可以帮我配置这些详细信息吗?

我已经阅读了许多有关如何配置 SASL 详细信息的文档,但仍然没有清楚地了解如何做到这一点。这里我附上我用来创建kafka消费者的代码

Properties props = new Properties();
props.put("bootstrap.servers", "servers");
String consumeGroup = "consumer_group";
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\"");
props.put("group.id", consumeGroup);
props.put("client.id", "client_id");
props.put("security.protocol", "SASL_SSL");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "101");
props.put("max.partition.fetch.bytes", "135");
// props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",      "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
Run Code Online (Sandbox Code Playgroud)

堆栈跟踪

    14:56:12.767 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Starting the Kafka consumer
    14:56:12.776 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-2, kafka-events-nonprod-ds1.i, …
Run Code Online (Sandbox Code Playgroud)

java sasl jaas apache-kafka kafka-consumer-api

8
推荐指数
1
解决办法
2万
查看次数

如何在反序列化过程中不使用无限循环的情况下编写kafka消费者?

如何在java中编写kafka消费者而不使用无限循环进行轮询?

我使用此链接作为参考创建了卡夫卡消费者。这里在处理传入记录函数中编写了 while(true) 循环,其中轮询新事件。如果我在我的项目中使用它,除了这个我无法做任何其他事情。有没有办法避免使用这种无限循环来获取新事件?

 public static void main(String[] str) throws InterruptedException {
    System.out.println("Starting  AtMostOnceConsumer ...");
    execute();
}
private static void execute() throws InterruptedException {
    KafkaConsumer<String, Event> consumer = createConsumer();
    // Subscribe to all partition in that topic. 'assign' could be used here
    // instead of 'subscribe' to subscribe to specific partition.
    consumer.subscribe(Arrays.asList("topic"));
    processRecords(consumer);
}
private static KafkaConsumer<String, Event> createConsumer() {
    Properties props = new Properties();
    String consumeGroup = "group_id";
    props.put("group.id", consumeGroup);
    props.put("org.slf4j.simpleLogger.defaultLogLevel", "INFO");
    props.put("client.id", "clientId");
    props.put("security.protocol", "SASL_SSL");

    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

3
推荐指数
1
解决办法
6563
查看次数

使用 Google 应用脚本开发 Gmail 插件时,如何安全地将数据存储在 Google 中?

我正在创建一个 Gmail 插件,我的疑问是如何在 Google 端保存一些安全和机密的信息,以便稍后在我的代码中使用它。Google 是否提供某种安全存储机制?

google-api google-apps-script google-apps-marketplace gmail-addons google-workspace

2
推荐指数
1
解决办法
340
查看次数