我每天都会生成数千个文件,我希望使用Kafka进行流式传输.当我尝试读取文件时,每一行都被视为单独的消息.
我想知道如何在Kafka主题中将每个文件的内容作为单个消息进行处理,并且消费者如何在单独的文件中将Kafka主题中的每个消息写入.
我有一个带有1个分区的kafka主题.如果它中有100条消息,则偏移量将为0.99.
根据kafka保留策略,所有消息将在指定的时间段后消失.
一旦所有消息被删除(保留期后),我将向该主题发送100条新消息.现在,消息的新偏移量从哪里开始?是100还是0?
我想知道新的补偿是100-199还是0-99?
鉴于以下同步kafka生产者
Properties props = new Properties();
props.put("max.block.ms", 30000);
props.put("request.timeout.ms", 30000);
props.put("retries", 5);
KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);
//Send message
producer.send(producerRecord).get();
Run Code Online (Sandbox Code Playgroud)
帮助我理解request.timeout.ms和max.block.ms生产者配置之间的区别.是否包括所有重试的最长时间?或者每次重试都有自己的超时?
新的Kafka版本(0.11)只支持一次语义.
我在java中有一个使用kafka事务代码的生成器设置,就像这样.
producer.initTransactions();
try {
producer.beginTransaction();
for (ProducerRecord<String, String> record : payload) {
producer.send(record);
}
Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
{
put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
}
};
producer.sendOffsetsToTransaction(groupCommit, "groupId");
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
Run Code Online (Sandbox Code Playgroud)
我不太确定如何使用sendOffsetsToTransaction以及它的预期用例.AFAIK,消费者群体是消费者端的多线程阅读功能.
贾瓦多克说
"发送消耗的偏移列表的消费群协调,也标志着这些偏移作为当前事务的一部分.这些偏移仅如果交易成功提交才算消耗.当你需要批量消费应该用这种方法并且一起产生消息,通常是消费转换产生模式."
如何生成维护消耗的偏移列表?什么意思呢?
这是我尝试安装的扩展: https: //github.com/EVODelavega/phpkafka
传递到队列的消息应采用 JSON 格式。
目前,我遇到安装错误: 1. 说明要求我安装librdkafka。2.上述步骤的安装链接是这个。我无法使用第一种和第四种方法安装。这是错误:
checking for librdkafka/rdkafka.h" in default path... not found
configure: error: Please reinstall the rdkafka distribution
Run Code Online (Sandbox Code Playgroud) 我创建了一个示例应用程序来检查我的生产者的代码。当我在没有分区键的情况下发送数据时,我的应用程序运行良好。但是,在指定数据分区的键时,我收到错误:
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 37 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 38 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 39 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
Run Code Online (Sandbox Code Playgroud)
对于消费者和生产者来说。我在互联网上搜索了很多,他们建议验证 kafka.acl 设置。我在 HDInsight 上使用 kafka,但我不知道如何验证它并解决此问题。
我的集群有以下配置:
我的生产者代码:
public static void produce(String brokers, String topicName) throws IOException{
// Set properties …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 python 2.7 使用 kafka-python 2.0.1 生成 Kafka 主题的消息(由于一些与工作场所相关的限制,无法使用 Python 3)
我在单独的环境中创建了一个类,并编译了该包并安装在虚拟环境中:
import json
from kafka import KafkaProducer
class KafkaSender(object):
def __init__(self):
self.producer = self.get_kafka_producer()
def get_kafka_producer(self):
return KafkaProducer(
bootstrap_servers=['locahost:9092'],
value_serializer=lambda x: json.dumps(x),
request_timeout_ms=2000,
)
def send(self, data):
self.producer.send("topicname", value=data)
Run Code Online (Sandbox Code Playgroud)
我的驱动程序代码是这样的:
from mypackage import KafkaSender
# driver code
data = {"a":"b"}
kafka_sender = KafkaSender()
kafka_sender.send(data)
Run Code Online (Sandbox Code Playgroud)
场景1:
我运行这段代码,它运行得很好,没有错误,但消息没有推送到主题。我已经确认这一点,因为该主题中的偏移或滞后没有增加。此外,消费者端没有记录任何内容。
场景 2:
从方法中注释/删除 Kafka 生产者的初始化__init__。
我将发送行从 更改为
self.producer.send("topicname", value=data) ie self.get_kafka_producer().send("topicname", value=data),不是提前(在类初始化期间)而是在将消息发送到主题之前创建 kafka 生产者。当我运行代码时,它运行得很好。该消息已发布到该主题。
我使用场景 1 的目的是创建一个 Kafka 生产者一次并使用它多次,而不是每次我想发送消息时都创建 Kafka …
我创建了 3 个 Kafka 经纪人设置,经纪人 ID 为 20、21、22。然后我创建了这个主题:
bin/kafka-topics.sh --zookeeper localhost:2181 \
--create --topic zeta --partitions 4 --replication-factor 3
Run Code Online (Sandbox Code Playgroud)
结果是:
当生产者向主题 zeta 发送消息“hello world”时,Kafka 首先将消息写入哪个分区?
“hello world”消息会在所有 4 个分区中复制吗?
3 个代理中的每个代理都包含所有 4 个分区?这与上述上下文中的复制因子 3 有何关系?
如果我有 8 个在自己的进程或线程中并行运行的消费者订阅了 zeta 主题,Kafka 如何分配分区或代理来并行服务这些消费者?
“我正在尝试将消息路由到 Kafka 中的死信主题,以防处理相应消息时出现任何失败。我已经为此功能设置了 SeektoCurrentErrorHandler 和 DeadLetterPublishingRecoverer。
消费者在执行此操作时抛出以下异常:
2020-08-07 12:09:38.841 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='a6558a22-470d-4708-b297-814996a42045' and payload='{123, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 116, 101, 115, 116, 95, 101, 120, 1...' to topic test_execution.DLT and partition 2:
org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.
2020-08-07 12:09:38.846 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer : Dead-letter publication failed for: ProducerRecord(topic=test_execution.DLT, partition=2, headers=RecordHeaders(headers = [RecordHeader(key = …Run Code Online (Sandbox Code Playgroud) java apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
Shopify/sarama是否提供类似于transactional.idJVM API的选项?
该库支持幂等(Config.Producer.Idemponent,类似于enable.idempotence),但我不明白如何在没有transactional.id.
如果我错了,请纠正我,Sarama 中缺少有关这些选项的文档。但是根据 JVM 文档,没有标识符的幂等性将受到单个生产者会话的限制。换句话说,当生产者失败并重新启动时,我们将失去保证。
我在源代码和一些测试(例如)中找到了相关属性,但不明白如何在外部使用它们。