我对卡夫卡比较新.我已经做了一些实验,但有一些事情我不清楚消费者抵消.从我到目前为止所理解的情况来看,当消费者开始时,它将开始读取的偏移量由配置设置决定auto.offset.reset
(如果我错了,请纠正我).
现在说,例如主题中有10条消息(偏移0到9),并且消费者在它关闭之前(或者在我杀死消费者之前)恰好消耗了其中的5条消息.然后说我重启那个消费者流程.我的问题是:
如果auto.offset.reset
设置为smallest
,它是否总是从偏移量0开始消耗?
如果auto.offset.reset
设置为largest
,是否将从偏移量5开始消耗?
关于这种情况的行为总是确定的吗?
如果我的问题中的任何内容不清楚,请不要犹豫.提前致谢.
我有一个运行2个分区的Kafka集群.我一直在寻找一种方法将分区数增加到3.但是,我不想丢失主题中的现有消息.我尝试停止Kafka,修改server.properties
文件以将分区数增加到3并重新启动Kafka.但是,这似乎没有任何改变.使用Kafka ConsumerOffsetChecker
,我仍然看到它只使用了2个分区.我使用的Kafka版本是0.8.2.2.在0.8.1版本中,曾经有一个被调用的脚本kafka-add-partitions.sh
,我想这可能会成功.但是,我在0.8.2中没有看到任何这样的脚本.有没有办法实现这个?我确实尝试创建一个全新的主题,对于那个主题,它似乎根据server.properties
文件中的更改使用了3个分区.但是,对于现有主题,它似乎并不关心.
我最近一直在学习LMAX Disruptor并且正在做一些实验.令我困惑的一件事是处理程序方法的endOfBatch
参数.请考虑以下代码.首先,我调用的虚拟消息和消费者类,以及:onEvent
EventHandler
Test1
Test1Worker
public class Test1 {
}
public class Test1Worker implements EventHandler<Test1>{
public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
try{
Thread.sleep(500);
}
catch(Exception e){
e.printStackTrace();
}
System.out.println("Received message with sequence " + sequence + ". "
+ "EndOfBatch = " + endOfBatch);
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,我已经延迟了500毫秒,以替代一些现实世界的工作.我也在控制台中打印了序列号
然后我的驱动程序类(作为生产者)调用DisruptorTest
:
public class DisruptorTest {
private static Disruptor<Test1> bus1;
private static ExecutorService test1Workers;
public static void main(String[] args){
test1Workers = Executors.newFixedThreadPool(1);
bus1 = new Disruptor<Test1>(new …
Run Code Online (Sandbox Code Playgroud) java multithreading producer-consumer disruptor-pattern lmax
我对 grpc 比较陌生。我遵循了 grpc java 教程,并且能够使用协议缓冲区生成所需的类并使客户端和服务器运行,这一切都很好。现在我想知道服务器可以同时使用多少个客户端连接?有什么办法可以配置吗?
我需要知道它的原因是我将在 32 线程机器上部署服务器,并且每个客户端请求将由一堆并行工作线程处理。所以我需要一些关于(最多)每个请求应该产生多少线程的指标,因为这应该受到服务器可以接受的同时连接数的限制。
我有一个关于 的背景下的侧面输入和广播的一般性问题Apache Beam
。期间计算所需的任何其他变量、列表、映射是否processElement
需要作为辅助输入传递?如果它们作为普通构造函数参数传递可以吗DoFn
?例如,如果我有一些固定(未计算)值变量(常量,如开始日期、结束日期),我想在processElement
. PCollectionView
现在,我可以分别从每个变量中创建单例,并将它们DoFn
作为侧面输入传递给构造函数。但是,我可以不这样做,而只是将每个常量作为普通构造函数参数传递给 吗DoFn
?我在这里错过了一些微妙的东西吗?
就代码而言,我什么时候应该这样做:
public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
// these are singleton views
private final PCollectionView<LocalDateTime> dateStartView;
private final PCollectionView<LocalDateTime> dateEndView;
public MyFilter(PCollectionView<LocalDateTime> dateStartView,
PCollectionView<LocalDateTime> dateEndView){
this.dateStartView = dateStartView;
this.dateEndView = dateEndView;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception{
// extract date values from the singleton views here and use them
Run Code Online (Sandbox Code Playgroud)
与以下相反:
public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> { …
Run Code Online (Sandbox Code Playgroud) dataflow broadcast data-processing google-cloud-dataflow apache-beam
我最近一直在与卡夫卡合作,对消费者群体下的消费者有点困惑.混淆的中心是将消费者实现为流程还是线程.对于这个问题,假设我正在使用高级消费者.
让我们考虑一下我尝试过的场景.在我的主题中有2个分区(为简单起见,我们假设复制因子只有1).我创建了一个消费者(ConsumerConnector
)过程consumer1
与组group1
,然后创建尺寸2的主题计数地图,然后产生了2个消费者线程consumer1_thread1
和consumer1_thread2
该过程下.它看起来像consumer1_thread1
正在消耗分区0
并且consumer1_thread2
正在消耗分区1
.这种行为总是确定的吗?以下是代码段.Class TestConsumer
是我的消费者线程类.
...
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(2));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(2);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new TestConsumer(stream, threadNumber));
threadNumber++;
}
...
Run Code Online (Sandbox Code Playgroud)
现在,让我们考虑另一个场景(我没有尝试但很好奇),我开始2个消费者进程consumer1
,consumer2
两个都有相同的组group1
,每个都是一个单线程进程.现在我的问题是:
在这种情况下,两个独立的消费者流程(在同一群组下)如何与分区相关联?它与上述单进程多线程场景有何不同?
通常,消费者线程或进程如何映射/与主题中的分区相关?
Kafka文档确实说消费者组下的每个消费者将使用一个分区.但是,这是指消费者线程(如我上面的代码示例)还是独立的消费者流程?
关于将消费者作为流程与线程实现,我在这里缺少任何微妙的东西吗?提前致谢.
java multithreading distributed-computing apache-kafka kafka-consumer-api
假设您获得了一组间隔(1,5),(6,10),(3,8),(7,9)。我期望的输出是3,因为最多有3个彼此相交的区间(3,8),(6,10)和(7,9)。请注意,(1,5)和(3,8)也相交,但这只是其中的2个。因此,这里的答案将是3,因为3是彼此相交的最大间隔数。
有哪些有效的查找方法?我想第一步将是根据起始值对时间间隔进行排序。之后有什么建议吗?