使用 RabbitMQ 源的 Spark 结构化流

ram*_*ram 6 java rabbitmq apache-spark spark-structured-streaming

我正在尝试编写一个自定义接收器Structured Streaming,它将使用来自RabbitMQ. Spark 最近发布了DataSource V2 API,看起来很有前途。因为它抽象了许多细节,所以为了简单和性能,我想使用这个 API。但是,由于它很新,因此可用的资源并不多。我需要有经验的人澄清一下Spark人,因为他们会更容易掌握关键点。开始了:

我的起点是博客文章系列,第一部分在这里。它展示了如何在没有流功能的情况下实现数据源。为了制作流媒体源,我稍微改变了它们,因为我需要实现MicroBatchReadSupport而不是(或除了)DataSourceV2

为了提高效率,明智的做法是让多个 spark executorRabbitMQ同时消费,即来自同一个队列。如果我没有感到困惑,输入的每个分区 -inSpark的术语 - 对应于队列中的一个消费者 -inRabbitMQ术语。因此,我们需要为输入流设置多个分区,对吗?

本系列的第 4 部分类似,我实现了MicroBatchReader,如下所示:

@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
    int partition = options.getInt(RMQ.PARTITICN, 5);
    List<DataReaderFactory<Row>> factories = new LinkedList<>();
    for (int i = 0; i < partition; i++) {
        factories.add(new RMQDataReaderFactory(options));
    }
    return factories;
}
Run Code Online (Sandbox Code Playgroud)

我正在返回一个工厂列表,并希望列表中的每个实例都将用于创建一个阅读器,它也将是一个消费者。这种方法正确吗?

我希望我的接收者是可靠的,即在每条处理过的消息(或至少写入 chekpoint 目录以供进一步处理)之后,我需要将其确认为RabbitMQ. 问题从这里开始:这些工厂是在驱动程序中创建的,实际的读取过程通过DataReader在执行程序中进行。但是,commit方法是 的一部分MicroBatchReader,而不是DataReader。由于我有很多DataReaders per MicroBatchReader,我应该如何将这些消息回复到RabbitMQ?或者我应该在调用下一个方法时确认DataReader?安全吗?如果是这样,那么commit函数的目的是什么?

澄清: 混淆:关于重命名某些类/函数的答案中提供的链接(除了那里的解释)使一切更清楚 比以往任何时候都更糟。从那里引用:

重命名:

  • DataReaderFactoryInputPartition

  • DataReaderInputPartitionReader

...

InputPartition的目的是管理关联阅读器的生命周期,现在称为InputPartitionReader,使用显式创建操作来镜像关闭操作。从 API 中不再清楚这一点,因为它DataReaderFactory似乎比它更通用,并且不清楚为什么要生成一组它们以供读取。

编辑:但是,文档明确指出“读取器工厂将被序列化并发送给执行器,然后将在执行器上创建数据读取器并进行实际读取。”

为了使消费者可靠,我必须仅在 Spark 端提交特定消息后才对其进行确认。请注意,消息必须在其传递的同一连接上被确认,但在驱动程序节点调用提交函数。如何在工作程序/执行程序节点上提交?

alz*_*lz2 0

> 我正在返回一个工厂列表,并希望列表中的每个实例都将用于创建一个读取器,该读取器也是一个消费者。这种做法正确吗?源 [socket][1] 源实现有一个线程将消息推送到内部 ListBuffer。换句话说,有一个消费者(线程)填充内部 ListBuffer,然后由“planInputPartitions”将其划分为多个分区(“createDataReaderFactories”[重命名][2]为“planInputPartitions”)。另外,根据 [MicroBatchReadSupport][3] 的 Javadoc > 执行引擎将在流式查询开始时创建一个微批次读取器,为每个批次交替调用 setOffsetRange 和 createDataReaderFactories 进行处理,然后调用 stop()当执行完成时。请注意,由于重启或故障恢复,单个查询可能会多次执行。换句话说,“createDataReaderFactories”应该被调用**多次**次,据我了解,这表明每个“DataReader”负责一个静态输入分区,这意味着DataReader不应该是消费者。---------- > 但是,commit 方法是 MicroBatchReader 的一部分,而不是 DataReader ...如果是这样,那么 commit 函数的目的是什么?也许提交函数的部分原理是为了防止 MicroBatchReader 的内部缓冲区变大。通过提交偏移量,您可以有效地从缓冲区中删除小于偏移量的元素,因为您承诺不再处理它们。您可以使用“batches.trimStart(offsetDiff)”在套接字源代码中看到这种情况发生


我不确定是否要实现一个可靠的接收器,所以我希望有一个更有经验的 Spark 人员过来回答你的问题,因为我也很感兴趣!希望这可以帮助!

编辑

我只研究了套接字维基编辑源。这些来源尚未准备好用于生产,这是问题所没有寻找的。相反,kafka源是更好的起点,与上述源不同,它拥有像作者正在寻找的多个消费者。

然而,也许如果您正在寻找不可靠的来源,上面的套接字和维基编辑来源提供了一个不太复杂的解决方案。