小编Ank*_*ana的帖子

为什么 kafka 无法提交特定分区的偏移量?

kafka 消费者无法仅提交特定分区的偏移量。

aklsfoipafasldmaknfa    asiofuasofiusaofasd
[2019-01-04 12:22:22,691] ERROR [Consumer clientId=consumer-1, groupId=console-consumer-11955] Offset commit failed on partition my-topic-2-9 at offset 0: The request timed out. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2019-01-04 12:22:28,617] ERROR [Consumer clientId=consumer-1, groupId=console-consumer-11955] Offset commit failed on partition my-topic-2-9 at offset 1: The request timed out. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
as;lkasl;dkas;faskfasfasfasodaspd   qdoiwudqouoaisdiaduasodiuasd
[2019-01-04 12:23:18,875] ERROR [Consumer clientId=consumer-1, groupId=console-consumer-11955] Offset commit failed on partition my-topic-2-9 at offset 1: The request timed out. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Run Code Online (Sandbox Code Playgroud)

谁能向我解释这个错误以及可能导致此错误的原因是什么?

我们的集群有 5 个代理在 AWS 中运行。我们使用 Apache Kafka 2.1。

我正在运行一个非常简单的 Kafka 控制台生产者,并使用 Kafka 控制台消费者消费相同的消息。 …

apache-kafka

6
推荐指数
1
解决办法
8667
查看次数

Apache kafka 生产集群设置问题

我们一直在尝试在 AWS Linux 机器上建立一个生产级的 Kafka 集群,直到现在我们都没有成功。

卡夫卡版本:2.1.0

机器:

5 r5.xlarge machines for 5 Kafka brokers.
3 t2.medium zookeeper nodes
1 t2.medium node for schema-registry and related tools. (a Single instance of each)
1 m5.xlarge machine for Debezium.
Run Code Online (Sandbox Code Playgroud)

默认代理配置:

num.partitions=15
min.insync.replicas=1
group.max.session.timeout.ms=2000000 
log.cleanup.policy=compact
default.replication.factor=3
zookeeper.session.timeout.ms=30000
Run Code Online (Sandbox Code Playgroud)

我们的问题主要与海量数据有关。我们正在尝试使用 debezium 传输 kafka 主题中的现有表。这些表中的许多都非常庞大,有超过 50000000 行。

到现在为止,我们已经尝试了很多方法,但是我们的集群每次都会因为一个或多个原因而失败。

错误在计划任务“isr-expiration”(kafka.utils.KafkaScheduler)org.apache.zookeeper.KeeperException$SessionExpiredException 中未捕获异常:KeeperErrorCode = Session 在 org.apache 的 /brokers/topics/__consumer_offsets/partitions/0/state 已过期。 zookeeper.KeeperException.create(KeeperException.java:130) 在 org.apache.zookeeper.KeeperException.create(KeeperException.java:54)..

错误 2:

] INFO [Partition xxx.public.driver_operation-14 broker=3] Cached zkVersion [21] 不等于zookeeper,跳过更新ISR (kafka.cluster.Partition) [2018-12-12 14:07:26,551] INFO [分区 …

apache-kafka apache-kafka-connect debezium

5
推荐指数
1
解决办法
3339
查看次数

卡夫卡经纪人花了太长时间才出现

最近,我们的 Kafka 经纪人之一(共 5 个)被错误关闭。现在我们再次启动它,有很多关于损坏的索引文件的警告消息,即使在 24 小时后,代理仍在启动。该代理中有超过 400 GB 的数据。

尽管其余代理已启动并正在运行,但某些分区将 -1 显示为它们的领导者,而将坏代理显示为唯一的 ISR。我没有看到其他副本被任命为新的领导者,也许是因为坏代理是这些分区唯一同步的代理。

Broker Properties: 
Replication Factor: 3
Min In Sync Replicas: 1
Run Code Online (Sandbox Code Playgroud)

我不知道如何处理这个。我应该等待经纪人自己解决所有问题吗?花这么多时间正常吗?

还有什么我可以做的吗?请帮忙。

apache-kafka

3
推荐指数
1
解决办法
1680
查看次数

如何指定连接窗口的保留期限?

我想加入两个流,并且将合并窗口设置为25小时,因为要合并的记录最多可以相隔24小时。

final Long JOIN_WINDOW = TimeUnit.HOURS.toMillis(25);

kstream.join(
  runsheetIdStream,
  (jt,r) -> { jt.setDate(r.getStart_date()); return jt; },
  JoinWindows.of(JOIN_WINDOW),
  Joined.with(Serdes.Long(),jobTransactionSerde,runsheetSerde))
Run Code Online (Sandbox Code Playgroud)

这将引发以下异常:

org.apache.kafka.streams.errors.TopologyException:无效的拓扑:联接窗口KSTREAM-JOINTHIS-0000000016-store的保留期必须不小于其窗口大小。

如何延长保留期限?

apache-kafka apache-kafka-streams

3
推荐指数
1
解决办法
432
查看次数

当流应用程序有多个实例时,有状态操作如何在Kafka流中工作?

状态完整操作如何在具有多个实例的Kafka Stream应用程序中工作?让我们说我们有2个主题,每个A和B有2个分区.我们有一个流应用程序,它既消耗了两个主题,又有两个流之间的连接.

现在我们正在运行此流应用程序的2个实例.据我所知,每个实例将分配每个主题的2个分区之一.

现在,如果要连接的消息被应用程序的不同实例使用,联接将如何发生?我无法理解它.

虽然我测试了一个似乎工作正常的小流应用程序.我是否可以在不考虑流应用程序中定义的拓扑类型的情况下,始终增加任何类型应用程序的实例数量?

是否有任何文件可以让我了解其工作细节?

apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
99
查看次数