小编Nag*_*han的帖子

无法从工厂方法 DataflowRunner#fromOptions in beamSql 中构造实例,apache beam

我在下面的beamSql程序中指定了数据流运行器:

DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    options.setStagingLocation("gs://gcpbucket/staging");
    options.setTempLocation("gs://gcpbucket/tmp");
    options.setProject("beta-19xxxx");
    options.setRunner(DataflowRunner.class);
    Pipeline p = Pipeline.create(options);
Run Code Online (Sandbox Code Playgroud)

但是我遇到了以下异常:

Exception in thread "main" java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:233)
at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:162)
at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:150)
at my.proj.StarterPipeline.main(StarterPipeline.java:34)Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:222)
... 4 more Caused by: java.lang.IllegalArgumentException: Unable to use ClassLoader to detect classpath elements. Current ClassLoader is jdk.internal.loader.ClassLoaders$AppClassLoader@782830e, only URLClassLoaders are supported.
at org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage(PipelineResources.java:43)
at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:262) …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
5175
查看次数

在 Apache Beam 中读取 CSV 文件时跳过标头

我想跳过 CSV 文件中的标题行。截至目前,我正在手动删除标头,然后将其加载到谷歌存储。

下面是我的代码:

PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));        
    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {
            String[] strArr = c.element().split(",");
            ClassFinance fin = new ClassFinance();
            fin.setBeneficiaryFinance(strArr[0]);
            fin.setCatlibCode(strArr[1]);
            fin.set_rNR_(Double.valueOf(strArr[2]));
            fin.set_rNCS_(Double.valueOf(strArr[3]));
            fin.set_rCtb_(Double.valueOf(strArr[4]));
            fin.set_rAC_(Double.valueOf(strArr[5]));
            c.output(fin);
        }
    }));
Run Code Online (Sandbox Code Playgroud)

我已经检查了 stackoverflow 中的现有问题,但我认为它没有希望:Skipping header rows - is it possible with Cloud DataFlow?

有什么帮助吗?

编辑:我尝试过类似下面的方法并且它有效:

PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));       

    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String …
Run Code Online (Sandbox Code Playgroud)

java google-cloud-platform google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
8531
查看次数

ImportError:无法从“itsdangerous”导入名称“TimedJSONWebSignatureSerializer”

我正在 AWS EC2 实例中使用其危险的 python 包运行 Flask 应用程序。

Traceback (most recent call last):
  File "run.py", line 4, in <module>
    app = create_app()
  File "/home/ubuntu/RHS_US/application/portal/__init__.py", line 29, in create_app
    from portal.users.routes import users
  File "/home/ubuntu/RHS_US/application/portal/users/routes.py", line 7, in <module>
    from portal.models import User
  File "/home/ubuntu/RHS_US/application/portal/models.py", line 7, in <module>
    from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
ImportError: cannot import name 'TimedJSONWebSignatureSerializer' from 'itsdangerous' (/home/ubuntu/.local/lib/python3.7/site-packages/itsdangerous/__init__.py)
Run Code Online (Sandbox Code Playgroud)

有解决办法吗?

python ubuntu

5
推荐指数
3
解决办法
1万
查看次数

如何在apache Beam中使用Pandas?

如何在Apache Beam中实现Pandas?我无法在多个列上执行左联接,并且Pcollections不支持sql查询。甚至Apache Beam文档也没有正确地构建框架。我检查了一下,但是在Apache Beam中找不到任何熊猫实现。谁能将我定向到所需的链接?

join pandas google-cloud-dataflow apache-beam

3
推荐指数
2
解决办法
2607
查看次数