java.lang.IllegalStateException:不支持在启动上下文后添加新输入,转换和输出操作

Ami*_*mar 5 apache-spark

当我尝试在Spark的Function调用中创建一个dStream时,我得到以下异常.

我的通话方式:

@Override
public JavaRDD<Object> call(JavaRDD<Object> v1) throws Exception {
    Queue<JavaRDD<Object>> queue = new LinkedList<>();
    queue.add(v1);
    JavaDStream<Object> dStream = context.queueStream(queue);
    JavaDStream<Object> newDStream = dStream.map(AbstractProcessor.this);
    final JavaRDD<Object> rdd = context.sparkContext().emptyRDD();
    newDStream.foreachRDD(new SaxFunction<JavaRDD<Object>, Void>() {
        private static final long serialVersionUID = 672054140484217234L;

        @Override
        public Void execute(JavaRDD<Object> object) throws Exception {
            rdd.union(object);
            return null;
        }
    });
    return rdd;
}
Run Code Online (Sandbox Code Playgroud)

例外:

Caused by: java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
    at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)
    at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
    at org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:42)
    at org.apache.spark.streaming.dstream.QueueInputDStream.<init>(QueueInputDStream.scala:29)
    at org.apache.spark.streaming.StreamingContext.queueStream(StreamingContext.scala:513)
    at org.apache.spark.streaming.StreamingContext.queueStream(StreamingContext.scala:492)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.queueStream(JavaStreamingContext.scala:436)
Run Code Online (Sandbox Code Playgroud)

有没有什么办法可以创建一个dStream并在运行时对它进行操作,或者我可以在上下文启动后更新DAG?提前致谢.