在Spark版本1.2.0中,可以使用subtract
2 SchemRDD
秒来结束与第一个不同的内容
val onlyNewData = todaySchemaRDD.subtract(yesterdaySchemaRDD)
Run Code Online (Sandbox Code Playgroud)
onlyNewData
包含todaySchemRDD
不存在的行yesterdaySchemaRDD
.
如何DataFrames
在Spark 1.3.0版本中实现这一目标?
我正在对存储在S3中的一些LZO压缩日志文件运行EMR Spark作业.有几个日志文件存储在同一个文件夹中,例如:
...
s3://mylogfiles/2014-08-11-00111.lzo
s3://mylogfiles/2014-08-11-00112.lzo
...
Run Code Online (Sandbox Code Playgroud)
在spark-shell中,我正在运行一个计算文件中行数的作业.如果我为每个文件单独计算行数,则没有问题,例如:
// Works fine
...
sc.textFile("s3://mylogfiles/2014-08-11-00111.lzo").count()
sc.textFile("s3://mylogfiles/2014-08-11-00112.lzo").count()
...
Run Code Online (Sandbox Code Playgroud)
如果我使用通配符加载带有单行的所有文件,我会得到两种异常.
// One-liner throws exceptions
sc.textFile("s3://mylogfiles/*.lzo").count()
Run Code Online (Sandbox Code Playgroud)
例外情况是:
java.lang.InternalError: lzo1x_decompress_safe returned: -6
at com.hadoop.compression.lzo.LzoDecompressor.decompressBytesDirect(Native Method)
Run Code Online (Sandbox Code Playgroud)
和
java.io.IOException: Compressed length 1362309683 exceeds max block size 67108864 (probably corrupt file)
at com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:291)
Run Code Online (Sandbox Code Playgroud)
在我看来,解决方案是由最后一个例外给出的文字暗示,但我不知道如何继续.LZO文件允许的大小是有限制的,或者问题是什么?
我的问题是:我可以运行Spark查询加载S3文件夹中的所有LZO压缩文件,而不会获得与I/O相关的异常吗?
每个文件有66个大约200MB的文件.
编辑:只有在使用Hadoop2核心库(ami 3.1.0)运行Spark时才会出现异常.当使用Hadoop1核心库(ami 2.4.5)运行时,一切正常.两个案例都使用Spark 1.0.1进行了测试.
我读到Azure上的Signalr需要服务总线实现(例如https://github.com/SignalR/SignalR/wiki/Azure-service-bus)以实现可伸缩性.
但是,我的服务器只对单个客户端(调用者)进行回调:
// Invoke a method on the calling client
Caller.addMessage(data);
Run Code Online (Sandbox Code Playgroud)
如果不需要Signalr的广播功能,还需要一个底层服务总线吗?