标签: kafka-producer-api

是否可以使用Kafka传输文件?

我每天都会生成数千个文件,我希望使用Kafka进行流式传输.当我尝试读取文件时,每一行都被视为单独的消息.

我想知道如何在Kafka主题中将每个文件的内容作为单个消息进行处理,并且消费者如何在单独的文件中将Kafka主题中的每个消息写入.

apache-kafka kafka-consumer-api kafka-producer-api

7
推荐指数
1
解决办法
6866
查看次数

保留期后卡夫卡抵消

我有一个带有1个分区的kafka主题.如果它中有100条消息,则偏移量将为0.99.

根据kafka保留策略,所有消息将在指定的时间段后消失.

一旦所有消息被删除(保留期后),我将向该主题发送100条新消息.现在,消息的新偏移量从哪里开始?是100还是0?

我想知道新的补偿是100-199还是0-99?

apache-kafka kafka-consumer-api kafka-producer-api

7
推荐指数
1
解决办法
878
查看次数

Apache Kafka Producer配置:'request.timeout.ms'VS.'max.block.ms'属性

鉴于以下同步kafka生产者

Properties props = new Properties();
props.put("max.block.ms", 30000);
props.put("request.timeout.ms", 30000);
props.put("retries", 5);

KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);

//Send message
producer.send(producerRecord).get();
Run Code Online (Sandbox Code Playgroud)

帮助我理解request.timeout.msmax.block.ms生产者配置之间的区别.是否包括所有重试的最长时间?或者每次重试都有自己的超时?

apache-kafka kafka-producer-api

7
推荐指数
2
解决办法
5170
查看次数

在Kafka 0.11中sendOffsetsToTransaction的含义

新的Kafka版本(0.11)只支持一次语义.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

我在java中有一个使用kafka事务代码的生成器设置,就像这样.

producer.initTransactions();
    try {
        producer.beginTransaction();
        for (ProducerRecord<String, String> record : payload) {
            producer.send(record);
        }

        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
            {
                put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
            }
        };
        producer.sendOffsetsToTransaction(groupCommit, "groupId");
        producer.commitTransaction();
    } catch (ProducerFencedException e) {
        producer.close();
    } catch (KafkaException e) {
        producer.abortTransaction();
    }
Run Code Online (Sandbox Code Playgroud)

我不太确定如何使用sendOffsetsToTransaction以及它的预期用例.AFAIK,消费者群体是消费者端的多线程阅读功能.

贾瓦多克说

"发送消耗的偏移列表的消费群协调,也标志着这些偏移作为当前事务的一部分.这些偏移仅如果交易成功提交才算消耗.当你需要批量消费应该用这种方法并且一起产生消息,通常是消费转换产生模式."

如何生成维护消耗的偏移列表?什么意思呢?

java multithreading apache-kafka kafka-producer-api

7
推荐指数
1
解决办法
1106
查看次数

如何安装适用于 PHP 的 Kafka 扩展?

这是我尝试安装的扩展: https: //github.com/EVODelavega/phpkafka

传递到队列的消息应采用 JSON 格式。

目前,我遇到安装错误: 1. 说明要求我安装librdkafka。2.上述步骤的安装链接是这个。我无法使用第一种和第四种方法安装。这是错误:

checking for librdkafka/rdkafka.h" in default path... not found
configure: error: Please reinstall the rdkafka distribution
Run Code Online (Sandbox Code Playgroud)

php apache-kafka kafka-consumer-api kafka-producer-api

7
推荐指数
3
解决办法
3万
查看次数

获取相关 ID 为 92 的元数据时出错:{myTest=UNKNOWN_TOPIC_OR_PARTITION}

我创建了一个示例应用程序来检查我的生产者的代码。当我在没有分区键的情况下发送数据时,我的应用程序运行良好。但是,在指定数据分区的键时,我收到错误:

[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 37 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 38 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 39 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
Run Code Online (Sandbox Code Playgroud)

对于消费者和生产者来说。我在互联网上搜索了很多,他们建议验证 kafka.acl 设置。我在 HDInsight 上使用 kafka,但我不知道如何验证它并解决此问题。

我的集群有以下配置:

  1. 头节点:2
  2. 工作节点:4
  3. 动物园管理员:3

我的生产者代码:

public static void produce(String brokers, String topicName) throws IOException{

    // Set properties …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-producer-api

7
推荐指数
1
解决办法
3万
查看次数

kafka-python:以 0 vs inf 秒超时关闭 kafka 生产者

我正在尝试使用 python 2.7 使用 kafka-python 2.0.1 生成 Kafka 主题的消息(由于一些与工作场所相关的限制,无法使用 Python 3)

我在单独的环境中创建了一个类,并编译了该包并安装在虚拟环境中:

import json
from kafka import KafkaProducer


class KafkaSender(object):
    def __init__(self):
        self.producer = self.get_kafka_producer()

    def get_kafka_producer(self):
        return KafkaProducer(
            bootstrap_servers=['locahost:9092'],
            value_serializer=lambda x: json.dumps(x),
            request_timeout_ms=2000,
        )

    def send(self, data):
        self.producer.send("topicname", value=data)
Run Code Online (Sandbox Code Playgroud)

我的驱动程序代码是这样的:

from mypackage import KafkaSender

# driver code
data = {"a":"b"}
kafka_sender = KafkaSender()
kafka_sender.send(data)
Run Code Online (Sandbox Code Playgroud)

场景1:
我运行这段代码,它运行得很好,没有错误,但消息没有推送到主题。我已经确认这一点,因为该主题中的偏移或滞后没有增加。此外,消费者端没有记录任何内容。

场景 2:
从方法中注释/删除 Kafka 生产者的初始化__init__
我将发送行从 更改为 self.producer.send("topicname", value=data) ie self.get_kafka_producer().send("topicname", value=data),不是提前(在类初始化期间)而是在将消息发送到主题之前创建 kafka 生产者。当我运行代码时,它运行得很好。该消息已发布到该主题。

我使用场景 1 的目的是创建一个 Kafka 生产者一次并使用它多次,而不是每次我想发送消息时都创建 Kafka …

python-2.7 kafka-python kafka-producer-api

7
推荐指数
0
解决办法
4994
查看次数

Kafka 分区和 Kafka 副本有什么区别?

我创建了 3 个 Kafka 经纪人设置,经纪人 ID 为 20、21、22。然后我创建了这个主题:

bin/kafka-topics.sh --zookeeper localhost:2181 \
  --create --topic zeta --partitions 4 --replication-factor 3
Run Code Online (Sandbox Code Playgroud)

结果是:

在此输入图像描述

当生产者向主题 zeta 发送消息“hello world”时,Kafka 首先将消息写入哪个分区?

“hello world”消息会在所有 4 个分区中复制吗?

3 个代理中的每个代理都包含所有 4 个分区?这与上述上下文中的复制因子 3 有何关系?

如果我有 8 个在自己的进程或线程中并行运行的消费者订阅了 zeta 主题,Kafka 如何分配分区或代理来并行服务这些消费者?

apache-kafka kafka-producer-api

7
推荐指数
2
解决办法
5207
查看次数

无法从 Kafka 中的消费者向死信主题发送消息

“我正在尝试将消息路由到 Kafka 中的死信主题,以防处理相应消息时出现任何失败。我已经为此功能设置了 SeektoCurrentErrorHandler 和 DeadLetterPublishingRecoverer。

消费者在执行此操作时抛出以下异常:

2020-08-07 12:09:38.841 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='a6558a22-470d-4708-b297-814996a42045' and payload='{123, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 116, 101, 115, 116, 95, 101, 120, 1...' to topic test_execution.DLT and partition 2:

org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.

2020-08-07 12:09:38.846 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer    : Dead-letter publication failed for: ProducerRecord(topic=test_execution.DLT, partition=2, headers=RecordHeaders(headers = [RecordHeader(key = …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

7
推荐指数
1
解决办法
3628
查看次数

Kafka:Sarama、幂等性和 transactional.id

Shopify/sarama是否提供类似于transactional.idJVM API的选项?

该库支持幂等(Config.Producer.Idemponent,类似于enable.idempotence),但我不明白如何在没有transactional.id.

如果我错了,请纠正我,Sarama 中缺少有关这些选项的文档。但是根据 JVM 文档,没有标识符的幂等性将受到单个生产者会话的限制。换句话说,当生产者失败并重新启动时,我们将失去保证。

我在源代码和一些测试(例如)中找到了相关属性,但不明白如何在外部使用它们。

go apache-kafka kafka-producer-api sarama

7
推荐指数
1
解决办法
236
查看次数