当我尝试在Spark的Function调用中创建一个dStream时,我得到以下异常.
我的通话方式:
@Override
public JavaRDD<Object> call(JavaRDD<Object> v1) throws Exception {
Queue<JavaRDD<Object>> queue = new LinkedList<>();
queue.add(v1);
JavaDStream<Object> dStream = context.queueStream(queue);
JavaDStream<Object> newDStream = dStream.map(AbstractProcessor.this);
final JavaRDD<Object> rdd = context.sparkContext().emptyRDD();
newDStream.foreachRDD(new SaxFunction<JavaRDD<Object>, Void>() {
private static final long serialVersionUID = 672054140484217234L;
@Override
public Void execute(JavaRDD<Object> object) throws Exception {
rdd.union(object);
return null;
}
});
return rdd;
}
Run Code Online (Sandbox Code Playgroud)
例外:
Caused by: java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)
at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
at org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:42)
at org.apache.spark.streaming.dstream.QueueInputDStream.<init>(QueueInputDStream.scala:29)
at org.apache.spark.streaming.StreamingContext.queueStream(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.queueStream(StreamingContext.scala:492)
at org.apache.spark.streaming.api.java.JavaStreamingContext.queueStream(JavaStreamingContext.scala:436)
Run Code Online (Sandbox Code Playgroud)
有没有什么办法可以创建一个dStream并在运行时对它进行操作,或者我可以在上下文启动后更新DAG?提前致谢.
| 归档时间: |
|
| 查看次数: |
1725 次 |
| 最近记录: |