Mar*_*rco 5 apache-spark spark-streaming dstream
我正在使用Apache Spark Streaming 1.6.1编写一个Java应用程序,它连接两个Key/Value数据流并将输出写入HDFS.这两个数据流包含K/V字符串,并使用textFileStream()定期在HDFS中从Spark中摄取.
两个数据流不同步,这意味着在时间t0在stream1中的一些键可以在时间t1出现在stream2中,反之亦然.因此,我的目标是加入两个流并计算"剩余"密钥,这应该考虑在下一个批处理间隔中的连接操作.
为了更好地澄清这一点,请查看以下算法:
variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0
operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)
Run Code Online (Sandbox Code Playgroud)
我试图用Spark Streaming实现这个算法失败了.最初,我以这种方式为剩余密钥创建了两个空流(这只是一个流,但生成第二个流的代码类似):
JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String> () {
@Override
public scala.Tuple2<String, String> call(String s) {
return new scala.Tuple2(s, s);
}
});
Run Code Online (Sandbox Code Playgroud)
稍后,这个空流与stream1统一(即union()),最后,在连接之后,我从stream1添加剩余的密钥并调用window().stream2也是如此.
问题是生成left_keys_s1和left_keys_s2的操作是没有操作的转换,这意味着Spark不会创建任何RDD流图,因此它们永远不会被执行.我现在得到的是一个连接,它只输出其键在同一时间间隔内的stream1和stream2中的记录.
你们有什么建议用Spark正确实现吗?
谢谢,马可
通过保留对保存这些值的 RDD 的引用,应该可以将值从一批转移到下一批。
不要尝试使用 合并流queueDStream,而是声明一个可变的 RDD 引用,该引用可以在每个流间隔更新。
这是一个例子:
在这个流作业中,我们从一个 RDD 开始100。每个时间间隔10都会生成随机数并减去最初的 100 个整数。这个过程一直持续到具有 100 个元素的初始 RDD 为空。此示例演示如何将元素从一个间隔转移到下一个间隔。
import scala.util.Random
import org.apache.spark.streaming.dstream._
val ssc = new StreamingContext(sparkContext, Seconds(2))
var targetInts:RDD[Int] = sc.parallelize(0 until 100)
var loops = 0
// we create an rdd of functions that generate random data.
// evaluating this RDD at each interval will generate new random data points.
val randomDataRdd = sc.parallelize(1 to 10).map(_ => () => Random.nextInt(100))
val dstream = new ConstantInputDStream(ssc, randomDataRdd)
// create values from the random func rdd
dataDStream.foreachRDD{rdd =>
loops += 1
targetInts = targetInts.subtract(rdd)
if (targetInts.isEmpty) {println(loops); ssc.stop(false)}
}
ssc.start()
Run Code Online (Sandbox Code Playgroud)
运行这个例子并绘图loops,targetInts.count得出以下图表:
我希望这能为您提供足够的指导来实现完整的用例。
| 归档时间: |
|
| 查看次数: |
1159 次 |
| 最近记录: |