我在下面的beamSql程序中指定了数据流运行器:
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setStagingLocation("gs://gcpbucket/staging");
options.setTempLocation("gs://gcpbucket/tmp");
options.setProject("beta-19xxxx");
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
Run Code Online (Sandbox Code Playgroud)
但是我遇到了以下异常:
Exception in thread "main" java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:233)
at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:162)
at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:150)
at my.proj.StarterPipeline.main(StarterPipeline.java:34)Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:222)
... 4 more Caused by: java.lang.IllegalArgumentException: Unable to use ClassLoader to detect classpath elements. Current ClassLoader is jdk.internal.loader.ClassLoaders$AppClassLoader@782830e, only URLClassLoaders are supported.
at org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage(PipelineResources.java:43)
at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:262) …Run Code Online (Sandbox Code Playgroud) 我想跳过 CSV 文件中的标题行。截至目前,我正在手动删除标头,然后将其加载到谷歌存储。
下面是我的代码:
PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr = c.element().split(",");
ClassFinance fin = new ClassFinance();
fin.setBeneficiaryFinance(strArr[0]);
fin.setCatlibCode(strArr[1]);
fin.set_rNR_(Double.valueOf(strArr[2]));
fin.set_rNCS_(Double.valueOf(strArr[3]));
fin.set_rCtb_(Double.valueOf(strArr[4]));
fin.set_rAC_(Double.valueOf(strArr[5]));
c.output(fin);
}
}));
Run Code Online (Sandbox Code Playgroud)
我已经检查了 stackoverflow 中的现有问题,但我认为它没有希望:Skipping header rows - is it possible with Cloud DataFlow?
有什么帮助吗?
编辑:我尝试过类似下面的方法并且它有效:
PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String …Run Code Online (Sandbox Code Playgroud) java google-cloud-platform google-cloud-dataflow apache-beam
我正在 AWS EC2 实例中使用其危险的 python 包运行 Flask 应用程序。
Traceback (most recent call last):
File "run.py", line 4, in <module>
app = create_app()
File "/home/ubuntu/RHS_US/application/portal/__init__.py", line 29, in create_app
from portal.users.routes import users
File "/home/ubuntu/RHS_US/application/portal/users/routes.py", line 7, in <module>
from portal.models import User
File "/home/ubuntu/RHS_US/application/portal/models.py", line 7, in <module>
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
ImportError: cannot import name 'TimedJSONWebSignatureSerializer' from 'itsdangerous' (/home/ubuntu/.local/lib/python3.7/site-packages/itsdangerous/__init__.py)
Run Code Online (Sandbox Code Playgroud)
有解决办法吗?
如何在Apache Beam中实现Pandas?我无法在多个列上执行左联接,并且Pcollections不支持sql查询。甚至Apache Beam文档也没有正确地构建框架。我检查了一下,但是在Apache Beam中找不到任何熊猫实现。谁能将我定向到所需的链接?