我正在Spark structured streaming用来处理从中读取的记录Kafka.这是我想要实现的目标:
(a)每条记录都是一种Tuple2类型(Timestamp, DeviceId).
(b)我创建了一个静态Dataset[DeviceId],其中包含DeviceId预期在Kafka流中看到的所有有效设备ID(类型)的集合.
(c)我需要写一个Spark structured streaming查询
(i) Groups records by their timestamp into 5-minute windows
(ii) For each window, get the list of valid device IDs that were **not** seen in that window
Run Code Online (Sandbox Code Playgroud)
例如,假设所有有效设备ID的列表都是,[A,B,C,D,E]并且某个5分钟窗口中的kafka记录包含设备ID [A,B,E].然后,对于该窗口,我正在寻找的看不见的设备ID列表是[C,D].
题
except()和join()方法Dataset.但是,他们都抛出了一个运行时异常,抱怨说这些操作都不受支持streaming Dataset.这是我的代码片段:
val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L))) …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql apache-spark-dataset spark-structured-streaming
我用的sbt插件scoverage来为我的斯卡拉项目的测试覆盖率报告。但是,我无法合并单元测试和集成测试的测试报告。
这是我运行的命令
sbt coverage test // to run unit tests
sbt coverage it:test // to run integration tests
sbt coverageReport // to generate coverage report
Run Code Online (Sandbox Code Playgroud)
在上面的例子中,我只得到了集成测试的覆盖率报告。
题
提前致谢。
我一直在尝试使用Roslyn来解析解决方案文件,并以编程方式为解决方案中的每个项目添加自定义程序集引用.
我尝试使用以下代码片段来执行相同的操作:
//The name of the DLL is customLib.dll
var reference = MetadataReference.CreateAssemblyReference("customLib");
project = project.AddMetadataReference(reference);
Run Code Online (Sandbox Code Playgroud)
但是,它在创建MetadataReference时遇到FileNotFoundException.
所以,我的问题是:如何指定Roslyn需要检查指定dll的路径?
谢谢.
我有一个 CSV 文件,我想将其导入到 SQLite 数据库中。我发现这可以通过 SQLite 命令行来完成。但是,我想知道是否可以通过 javascript 导入 CSV。
目前,我正在使用sql.js库通过 Javascript 与 SQLite 进行交互。
提前致谢!
我想使用Spark Streaming处理来自Kafka的实时数据流。我需要从传入流中计算各种统计信息,并且需要针对持续时间不同的窗口进行计算。例如,我可能需要计算最近5分钟的统计信息“ A”的平均值,同时计算最近1小时的统计信息“ B”的中位数。
在这种情况下,推荐使用Spark Streaming的方法是什么?以下是我可能想到的一些选择:
(i)从Kafka获得一个DStream,并使用window()方法从其中创建多个DStream。对于每个结果DStream,windowDuration将根据需要设置为不同的值。例如:
// pseudo-code
val streamA = kafkaDStream.window(Minutes(5), Minutes(1))
val streamB = kafkaDStream.window(Hours(1), Minutes(10))
Run Code Online (Sandbox Code Playgroud)
(ii)运行单独的Spark Streaming应用-每个统计信息一个
问题
对我来说(i)似乎是一种更有效的方法。但是,我对此有一些疑问:
提前致谢
analytics real-time apache-kafka apache-spark spark-streaming
我有几个与Spark Streaming相关的基本问题
[如果这些问题已在其他帖子中得到解答,请告诉我 - 我找不到任何问题]:
(i)在Spark Streaming中,默认情况下RDD中的分区数是否等于工作者数?
(ii)在Spark-Kafka集成的直接方法中,创建的RDD分区数等于Kafka分区的数量.假设每个RDD分区i都映射到j每个批处理中的同一个工作节点是否正确DStream?即,是否仅基于分区的索引将分区映射到工作节点?例如,可以将分区2分配给一个批次中的worker 1,将另一个分区分配给worker 3吗?
提前致谢
scala apache-kafka apache-spark spark-streaming apache-spark-1.4
apache-spark ×3
scala ×3
apache-kafka ×2
analytics ×1
c# ×1
csv ×1
database ×1
javascript ×1
real-time ×1
roslyn ×1
sbt ×1
scoverage ×1
sqlite ×1
unit-testing ×1