小编Asi*_*bal的帖子

什么决定卡夫卡消费者的抵消?

我对卡夫卡比较新.我已经做了一些实验,但有一些事情我不清楚消费者抵消.从我到目前为止所理解的情况来看,当消费者开始时,它将开始读取的偏移量由配置设置决定auto.offset.reset(如果我错了,请纠正我).

现在说,例如主题中有10条消息(偏移0到9),并且消费者在它关闭之前(或者在我杀死消费者之前)恰好消耗了其中的5条消息.然后说我重启那个消费者流程.我的问题是:

  1. 如果auto.offset.reset设置为smallest,它是否总是从偏移量0开始消耗?

  2. 如果auto.offset.reset设置为largest,是否将从偏移量5开始消耗?

  3. 关于这种情况的行为总是确定的吗?

如果我的问题中的任何内容不清楚,请不要犹豫.提前致谢.

java distributed-computing apache-kafka

151
推荐指数
3
解决办法
7万
查看次数

是否可以在Kafka 0.8.2中为现有主题添加分区

我有一个运行2个分区的Kafka集群.我一直在寻找一种方法将分区数增加到3.但是,我不想丢失主题中的现有消息.我尝试停止Kafka,修改server.properties文件以将分区数增加到3并重新启动Kafka.但是,这似乎没有任何改变.使用Kafka ConsumerOffsetChecker,我仍然看到它只使用了2个分区.我使用的Kafka版本是0.8.2.2.在0.8.1版本中,曾经有一个被调用的脚本kafka-add-partitions.sh,我想这可能会成功.但是,我在0.8.2中没有看到任何这样的脚本.有没有办法实现这个?我确实尝试创建一个全新的主题,对于那个主题,它似乎根据server.properties文件中的更改使用了3个分区.但是,对于现有主题,它似乎并不关心.

java distributed-computing apache-kafka

48
推荐指数
6
解决办法
4万
查看次数

LMAX Disruptor - 决定批量大小的因素是什么?

我最近一直在学习LMAX Disruptor并且正在做一些实验.令我困惑的一件事是处理程序方法的endOfBatch参数.请考虑以下代码.首先,我调用的虚拟消息和消费者类,以及:onEventEventHandlerTest1Test1Worker

public class Test1 {

}

public class Test1Worker implements EventHandler<Test1>{
    public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
        try{
            Thread.sleep(500);
        }
        catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("Received message with sequence " + sequence + ". "
                + "EndOfBatch = " + endOfBatch);
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意,我已经延迟了500毫秒,以替代一些现实世界的工作.我也在控制台中打印了序列号

然后我的驱动程序类(作为生产者)调用DisruptorTest:

public class DisruptorTest {

    private static Disruptor<Test1> bus1;

    private static ExecutorService test1Workers;

    public static void main(String[] args){             
        test1Workers = Executors.newFixedThreadPool(1);

        bus1 = new Disruptor<Test1>(new …
Run Code Online (Sandbox Code Playgroud)

java multithreading producer-consumer disruptor-pattern lmax

6
推荐指数
1
解决办法
2512
查看次数

如何在grpc中配置最大同时连接数

我对 grpc 比较陌生。我遵循了 grpc java 教程,并且能够使用协议缓冲区生成所需的类并使客户端和服务器运行,这一切都很好。现在我想知道服务器可以同时使用多少个客户端连接?有什么办法可以配置吗?

我需要知道它的原因是我将在 32 线程机器上部署服务器,并且每个客户端请求将由一堆并行工作线程处理。所以我需要一些关于(最多)每个请求应该产生多少线程的指标,因为这应该受到服务器可以接受的同时连接数的限制。

java multithreading rpc grpc

5
推荐指数
1
解决办法
6968
查看次数

Apache Beam 中的侧面输入与普通构造函数参数

我有一个关于 的背景下的侧面输入和广播的一般性问题Apache Beam。期间计算所需的任何其他变量、列表、映射是否processElement需要作为辅助输入传递?如果它们作为普通构造函数参数传递可以吗DoFn?例如,如果我有一些固定(未计算)值变量(常量,如开始日期、结束日期),我想在processElement. PCollectionView现在,我可以分别从每个变量中创建单例,并将它们DoFn作为侧面输入传递给构造函数。但是,我可以不这样做,而只是将每个常量作为普通构造函数参数传递给 吗DoFn?我在这里错过了一些微妙的东西吗?

就代码而言,我什么时候应该这样做:

public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
  // these are singleton views
  private final PCollectionView<LocalDateTime> dateStartView;
  private final PCollectionView<LocalDateTime> dateEndView;

  public MyFilter(PCollectionView<LocalDateTime> dateStartView,
                       PCollectionView<LocalDateTime> dateEndView){

      this.dateStartView = dateStartView;
      this.dateEndView = dateEndView;
  }

  @ProcessElement
  public void processElement(ProcessContext c) throws Exception{
  // extract date values from the singleton views here and use them
Run Code Online (Sandbox Code Playgroud)

与以下相反:

public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> { …
Run Code Online (Sandbox Code Playgroud)

dataflow broadcast data-processing google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
2538
查看次数

Kafka使用者 - 消费者进程和线程与主题分区的关系是什么

我最近一直在与卡夫卡合作,对消费者群体下的消费者有点困惑.混淆的中心是将消费者实现为流程还是线程.对于这个问题,假设我正在使用高级消费者.

让我们考虑一下我尝试过的场景.在我的主题中有2个分区(为简单起见,我们假设复制因子只有1).我创建了一个消费者(ConsumerConnector)过程consumer1与组group1,然后创建尺寸2的主题计数地图,然后产生了2个消费者线程consumer1_thread1consumer1_thread2该过程下.它看起来像consumer1_thread1正在消耗分区0并且consumer1_thread2正在消耗分区1.这种行为总是确定的吗?以下是代码段.Class TestConsumer是我的消费者线程类.

    ...
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(2));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    executor = Executors.newFixedThreadPool(2);

    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new TestConsumer(stream, threadNumber));
        threadNumber++;
    }
    ...
Run Code Online (Sandbox Code Playgroud)

现在,让我们考虑另一个场景(我没有尝试但很好奇),我开始2个消费者进程consumer1,consumer2两个都有相同的组group1,每个都是一个单线程进程.现在我的问题是:

  1. 在这种情况下,两个独立的消费者流程(在同一群组下)如何与分区相关联?它与上述单进程多线程场景有何不同?

  2. 通常,消费者线程或进程如何映射/与主题中的分区相关?

  3. Kafka文档确实说消费者组下的每个消费者将使用一个分区.但是,这是指消费者线程(如我上面的代码示例)还是独立的消费者流程?

  4. 关于将消费者作为流程与线程实现,我在这里缺少任何微妙的东西吗?提前致谢.

java multithreading distributed-computing apache-kafka kafka-consumer-api

4
推荐指数
1
解决办法
7842
查看次数

给定一组间隔,如何找到它们之间的最大交点数,

假设您获得了一组间隔(1,5),(6,10),(3,8),(7,9)。我期望的输出是3,因为最多有3个彼此相交的区间(3,8),(6,10)和(7,9)。请注意,(1,5)和(3,8)也相交,但这只是其中的2个。因此,这里的答案将是3,因为3是彼此相交的最大间隔数。

有哪些有效的查找方法?我想第一步将是根据起始值对时间间隔进行排序。之后有什么建议吗?

arrays sorting algorithm data-structures

3
推荐指数
1
解决办法
944
查看次数