ram*_*ram 6 java rabbitmq apache-spark spark-structured-streaming
我正在尝试编写一个自定义接收器Structured Streaming,它将使用来自RabbitMQ.
Spark 最近发布了DataSource V2 API,看起来很有前途。因为它抽象了许多细节,所以为了简单和性能,我想使用这个 API。但是,由于它很新,因此可用的资源并不多。我需要有经验的人澄清一下Spark人,因为他们会更容易掌握关键点。开始了:
我的起点是博客文章系列,第一部分在这里。它展示了如何在没有流功能的情况下实现数据源。为了制作流媒体源,我稍微改变了它们,因为我需要实现MicroBatchReadSupport而不是(或除了)DataSourceV2。
为了提高效率,明智的做法是让多个 spark executorRabbitMQ同时消费,即来自同一个队列。如果我没有感到困惑,输入的每个分区 -inSpark的术语 - 对应于队列中的一个消费者 -inRabbitMQ术语。因此,我们需要为输入流设置多个分区,对吗?
与本系列的第 4 部分类似,我实现了MicroBatchReader,如下所示:
@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
int partition = options.getInt(RMQ.PARTITICN, 5);
List<DataReaderFactory<Row>> factories = new LinkedList<>();
for (int i = 0; i < partition; i++) {
factories.add(new RMQDataReaderFactory(options));
}
return factories;
}
Run Code Online (Sandbox Code Playgroud)
我正在返回一个工厂列表,并希望列表中的每个实例都将用于创建一个阅读器,它也将是一个消费者。这种方法正确吗?
我希望我的接收者是可靠的,即在每条处理过的消息(或至少写入 chekpoint 目录以供进一步处理)之后,我需要将其确认为RabbitMQ. 问题从这里开始:这些工厂是在驱动程序中创建的,实际的读取过程通过DataReader在执行程序中进行。但是,commit方法是 的一部分MicroBatchReader,而不是DataReader。由于我有很多DataReaders per MicroBatchReader,我应该如何将这些消息回复到RabbitMQ?或者我应该在调用下一个方法时确认DataReader?安全吗?如果是这样,那么commit函数的目的是什么?
澄清: 混淆:关于重命名某些类/函数的答案中提供的链接(除了那里的解释)使一切更清楚 比以往任何时候都更糟。从那里引用:
重命名:
DataReaderFactory到InputPartition
DataReader到InputPartitionReader...
InputPartition的目的是管理关联阅读器的生命周期,现在称为InputPartitionReader,使用显式创建操作来镜像关闭操作。从 API 中不再清楚这一点,因为它DataReaderFactory似乎比它更通用,并且不清楚为什么要生成一组它们以供读取。
编辑:但是,文档明确指出“读取器工厂将被序列化并发送给执行器,然后将在执行器上创建数据读取器并进行实际读取。”
为了使消费者可靠,我必须仅在 Spark 端提交特定消息后才对其进行确认。请注意,消息必须在其传递的同一连接上被确认,但在驱动程序节点调用提交函数。如何在工作程序/执行程序节点上提交?
> 我正在返回一个工厂列表,并希望列表中的每个实例都将用于创建一个读取器,该读取器也是一个消费者。这种做法正确吗?源 [socket][1] 源实现有一个线程将消息推送到内部 ListBuffer。换句话说,有一个消费者(线程)填充内部 ListBuffer,然后由“planInputPartitions”将其划分为多个分区(“createDataReaderFactories”[重命名][2]为“planInputPartitions”)。另外,根据 [MicroBatchReadSupport][3] 的 Javadoc > 执行引擎将在流式查询开始时创建一个微批次读取器,为每个批次交替调用 setOffsetRange 和 createDataReaderFactories 进行处理,然后调用 stop()当执行完成时。请注意,由于重启或故障恢复,单个查询可能会多次执行。换句话说,“createDataReaderFactories”应该被调用**多次**次,据我了解,这表明每个“DataReader”负责一个静态输入分区,这意味着DataReader不应该是消费者。---------- > 但是,commit 方法是 MicroBatchReader 的一部分,而不是 DataReader ...如果是这样,那么 commit 函数的目的是什么?也许提交函数的部分原理是为了防止 MicroBatchReader 的内部缓冲区变大。通过提交偏移量,您可以有效地从缓冲区中删除小于偏移量的元素,因为您承诺不再处理它们。您可以使用“batches.trimStart(offsetDiff)”在套接字源代码中看到这种情况发生
编辑
我只研究了套接字和维基编辑源。这些来源尚未准备好用于生产,这是问题所没有寻找的。相反,kafka源是更好的起点,与上述源不同,它拥有像作者正在寻找的多个消费者。
然而,也许如果您正在寻找不可靠的来源,上面的套接字和维基编辑来源提供了一个不太复杂的解决方案。
| 归档时间: |
|
| 查看次数: |
1374 次 |
| 最近记录: |