Apache Beam返回"输入值不得以任何方式变异".当使用当地直接跑步者

Kas*_*ady 4 google-cloud-platform google-cloud-dataflow apache-beam

我写了一个Apache Beam DoFn

static class FillLocation extends DoFn<TrackingRequest, TrackingRequest> {
        @ProcessElement
        public void processElement(ProcessContext c) {    
            TrackingRequest rq = c.element();
            rq.location = getLocationFromIP(rq.IP);         
            c.output(rq);
        }
}
Run Code Online (Sandbox Code Playgroud)

当它在本地测试它时,它给了我这个错误PTransform ..非法变异值...类.....

 Input values must not be mutated in any way.
    at org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.verifyUnmodified(ImmutabilityEnforcementFactory.java:96)
    at org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.afterElement(ImmutabilityEnforcementFactory.java:71)
    at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:149)
    at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

M S*_*ong 6

您的函数修改了输入TrackingRequest元素的位置字段.Dataflow不允许这样做.

文档说:

输入PCollection的当前元素由c.element()返回.它应该被认为是不可改变的.Dataflow运行时不会改变元素,因此可以安全地进行缓存等.任何DoFn方法都不应该改变元素,因为它可以缓存在其他地方,由Dataflow运行时保留,或者以其他未指定的方式使用.

您可以创建输入元素的副本,修改字段,并将副本作为输出发送出去.

  • 当你突变一个元素时,检测起来有些昂贵.因此,正是测试`DirectRunner`检查这一点,以便尽早捕获管道中的错误.如果您在Google Cloud Dataflow上执行此类突变,您实际上可能会随机获得不正确的结果! (3认同)