小编jit*_*npt的帖子

Spark结构化流 - 将静态数据集与流数据集连接起来

我正在Spark structured streaming用来处理从中读取的记录Kafka.这是我想要实现的目标:

(a)每条记录都是一种Tuple2类型(Timestamp, DeviceId).

(b)我创建了一个静态Dataset[DeviceId],其中包含DeviceId预期在Kafka流中看到的所有有效设备ID(类型)的集合.

(c)我需要写一个Spark structured streaming查询

 (i) Groups records by their timestamp into 5-minute windows
 (ii) For each window, get the list of valid device IDs that were **not** seen in that window
Run Code Online (Sandbox Code Playgroud)

例如,假设所有有效设备ID的列表都是,[A,B,C,D,E]并且某个5分钟窗口中的kafka记录包含设备ID [A,B,E].然后,对于该窗口,我正在寻找的看不见的设备ID列表是[C,D].

  1. 如何在Spark结构化流中编写此查询?我尝试使用公开的方法except()join()方法Dataset.但是,他们都抛出了一个运行时异常,抱怨说这些操作都不受支持streaming Dataset.

这是我的代码片段:

val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L))) …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-dataset spark-structured-streaming

13
推荐指数
2
解决办法
3717
查看次数

sbt 聚合单元和集成测试的覆盖率报告

我用的sbt插件scoverage来为我的斯卡拉项目的测试覆盖率报告。但是,我无法合并单元测试和集成测试的测试报告。

这是我运行的命令

sbt coverage test // to run unit tests
sbt coverage it:test // to run integration tests
sbt coverageReport // to generate coverage report
Run Code Online (Sandbox Code Playgroud)

在上面的例子中,我只得到了集成测试的覆盖率报告。

  • 如何生成汇总单元测试和集成测试结果的报告?

提前致谢。

integration-testing unit-testing scala sbt scoverage

9
推荐指数
1
解决办法
6208
查看次数

如何以编程方式将程序集引用添加到项目?

我一直在尝试使用Roslyn来解析解决方案文件,并以编程方式为解决方案中的每个项目添加自定义程序集引用.

我尝试使用以下代码片段来执行相同的操作:

//The name of the DLL is customLib.dll
var reference = MetadataReference.CreateAssemblyReference("customLib");
project = project.AddMetadataReference(reference);
Run Code Online (Sandbox Code Playgroud)

但是,它在创建MetadataReference时遇到FileNotFoundException.

所以,我的问题是:如何指定Roslyn需要检查指定dll的路径

谢谢.

c# roslyn

7
推荐指数
1
解决办法
2487
查看次数

使用 sql.js 将 CSV 文件导入 SQLite

我有一个 CSV 文件,我想将其导入到 SQLite 数据库中。我发现这可以通过 SQLite 命令行来完成。但是,我想知道是否可以通过 javascript 导入 CSV。

目前,我正在使用sql.js库通过 Javascript 与 SQLite 进行交互。

提前致谢!

javascript database csv sqlite

6
推荐指数
1
解决办法
4480
查看次数

Spark Streaming应用程序中多个持续时间不同的窗口

我想使用Spark Streaming处理来自Kafka的实时数据流。我需要从传入流中计算各种统计信息,并且需要针对持续时间不同的窗口进行计算。例如,我可能需要计算最近5分钟的统计信息“ A”的平均值,同时计算最近1小时的统计信息“ B”的中位数。

在这种情况下,推荐使用Spark Streaming的方法是什么?以下是我可能想到的一些选择:

(i)从Kafka获得一个DStream,并使用window()方法从其中创建多个DStream。对于每个结果DStream,windowDuration将根据需要设置为不同的值。例如:

// pseudo-code
val streamA = kafkaDStream.window(Minutes(5), Minutes(1))
val streamB = kafkaDStream.window(Hours(1), Minutes(10))
Run Code Online (Sandbox Code Playgroud)

(ii)运行单独的Spark Streaming应用-每个统计信息一个

问题

对我来说(i)似乎是一种更有效的方法。但是,我对此有一些疑问:

  • streamA和streamB如何在底层数据结构中表示。
  • 他们会共享数据-因为它们源自KafkaDStream?还是会有重复的数据?
  • 另外,还有更有效的方法来处理这种用例。

提前致谢

analytics real-time apache-kafka apache-spark spark-streaming

5
推荐指数
1
解决办法
1476
查看次数

Spark + Kafka集成 - 将Kafka分区映射到RDD分区

我有几个与Spark Streaming相关的基本问题

[如果这些问题已在其他帖子中得到解答,请告诉我 - 我找不到任何问题]:

(i)在Spark Streaming中,默认情况下RDD中的分区数是否等于工作者数?

(ii)在Spark-Kafka集成的直接方法中,创建的RDD分区数等于Kafka分区的数量.假设每个RDD分区i都映射到j每个批处理中的同一个工作节点是否正确DStream?即,是否仅基于分区的索引将分区映射到工作节点?例如,可以将分区2分配给一个批次中的worker 1,将另一个分区分配给worker 3吗?

提前致谢

scala apache-kafka apache-spark spark-streaming apache-spark-1.4

5
推荐指数
1
解决办法
833
查看次数