我想用一些数据运行机器学习模型。在用这些数据训练模型之前,我需要处理它,所以我一直在阅读一些方法来做到这一点。
首先创建一个 Dataflow 管道将其上传到 Bigquery 或 Google Cloud Storage,然后使用 Google Dataprep 创建一个数据管道来清理它。
我想这样做的另一种方法是使用数据融合,它可以更轻松地创建数据管道,但我不知道,这是我的疑问,数据融合只是创建像 Dataflow 这样的管道,然后我必须使用DataPrep 来清理数据,或者 Data Fusion 是否可以清理数据并准备将其放入我的机器学习模型中。
如果 Data Fusion 可以将数据清理为 DataPrep,那么我应该什么时候使用 DataPrep?
google-cloud-platform google-cloud-dataflow google-cloud-dataprep google-cloud-data-fusion
我的目标是通过 Cloud Data Fusion 管道将数据从 Cloud SQL Postgres 读取到 BigQuery。
为此,我设置了一个 Cloud Data Fusion 实例并为服务帐户分配了以下两个权限:(请参阅https://cloud.google.com/data-fusion/docs/how-to/create-instance#setting_up_permissions)
作为下一步,我将自己连接到 Cloud Data Fusion 实例,并导航到Wrangler -> Add Connection -> Database -> Google Cloud SQL for PostgreSQL
.
作为驱动程序,我上传了postgres-socket-factory-1.0.13-jar-with-dependencies.jar
我在此处下载的驱动程序:https : //github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory
对于驱动程序配置,我设置:
对于数据库连接,我设置:
<PROJECT_NAME>:<REGION>:<INSTANCE_CONNECTION_NAME>
jdbc:postgresql://google/<DATABASE_NAME>?cloudSqlInstance=<INSTANCE_CONNECTION_NAME>&socketFactory=com.google.cloud.sql.postgres.SocketFactory
单击测试连接后,我收到org.postgresql.Driver错误消息。
我正在使用数据融合创建一个管道,该管道会将CSV数据从GCS加载到BigQuery。当我进行预览时,效果很好。但是,当我部署管道时,它给了我下面的错误。
ERROR io.cdap.cdap.internal.provision.task.ProvisioningTask#151-provisioning-service-13 DEPROVISION task failed in REQUESTING_DELETE state for program run program_run:default.gcstobqsample.-SNAPSHOT.workflow.DataPipelineWorkflow.31a8341b-70d6-11e9-9c94-92fdc3807015.
com.google.api.gax.rpc.FailedPreconditionException: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Cannot delete cluster 'cdap-gcstobqsa-31a8341b-70d6-11e9-9c94-92fdc3807015' while it has other pending delete operations.
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:59) ~[na:na]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) ~[na:na]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) ~[na:na]
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:95) ~[na:na]
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:61) ~[na:na]
at com.google.common.util.concurrent.Futures$4.run(Futures.java:1123) ~[com.google.guava.guava-13.0.1.jar:na]
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:435) ~[na:na]
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:900) ~[com.google.guava.guava-13.0.1.jar:na]
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:811) ~[com.google.guava.guava-13.0.1.jar:na]
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:675) ~[com.google.guava.guava-13.0.1.jar:na]
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:492) ~[na:na]
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:467) ~[na:na]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:41) ~[na:na]
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:684) ~[na:na]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:41) ~[na:na]
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:392) ~[na:na]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:475) ~[na:na]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63) …
Run Code Online (Sandbox Code Playgroud) 我在 europe-west1-b 中创建了一个基本实例。我尝试连接 2 个 BigQuery 表中的数据并将结果写回 BigQuery。我收到此错误:java.io.IOException:无法在不同位置读写:来源:欧盟,目的地:美国
自动创建的临时存储桶位于美国,而 Cloud Data Fusion 实例和 BigQuery 表位于欧盟。
我通过在正确的区域中手动创建一个存储桶并在接收器/源中指定这些来解决这个问题。
是否可以通过自动选择正确的区域来避免此手动步骤?
我正在评估用于内部项目的 Google Cloud Data Fusion,我希望能够设置 Data Fusion 实例、定义和部署计划管道,然后关闭 Data Fusion 实例。但是,当实例关闭时,我仍然希望计划的管道继续按计划运行。这可能吗?
我已通读 Google Data Fusion 文档并了解如何设置预定管道,但我不清楚该管道一旦创建,如何与实例相关联。一些文档听起来像是“操作”和“实例”是独立的实体,这让我觉得我想做的事情可能是可能的,但我仍然不清楚如何做到这一点。
尝试构建管道以从 3rd 方 REST API 端点数据源读取。
我正在使用 Hub 中的 HTTP(1.2.0 版)插件。
响应请求 URL 是: https://api.example.io/v2/somedata?return_count=false
响应体示例:
{
"paging": {
"token": "12456789",
"next": "https://api.example.io/v2/somedata?return_count=false&__paging_token=123456789"
},
"data": [
{
"cID": "aerrfaerrf",
"first": true,
"_id": "aerfaerrfaerrf",
"action": "aerrfaerrf",
"time": "1970-10-09T14:48:29+0000",
"email": "example@aol.com"
},
{...}
]
}
Run Code Online (Sandbox Code Playgroud)
日志中的主要错误是:
java.lang.NullPointerException: null
at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.getNextPage(BaseHttpPaginationIterator.java:118) ~[1580429892615-0/:na]
at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.ensurePageIterable(BaseHttpPaginationIterator.java:161) ~[1580429892615-0/:na]
at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.hasNext(BaseHttpPaginationIterator.java:203) ~[1580429892615-0/:na]
at io.cdap.plugin.http.source.batch.HttpRecordReader.nextKeyValue(HttpRecordReader.java:60) ~[1580429892615-0/:na]
at io.cdap.cdap.etl.batch.preview.LimitingRecordReader.nextKeyValue(LimitingRecordReader.java:51) ~[cdap-etl-core-6.1.1.jar:na]
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:214) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:128) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at …
Run Code Online (Sandbox Code Playgroud) 我的生产管道仅使用 Google Data Fusion 运行几个小时。我想停止数据融合实例并在第二天启动它。我没有看到停止实例的选项。无论如何,我们可以停止实例并再次启动同一个实例吗?
我正在按照Cloud Data Fusion示例教程中的说明进行操作,并且一切似乎都正常进行,直到尝试在最后运行管道为止。根据说明为Google托管服务帐户设置了Cloud Data Fusion Service API权限。管道预览功能可以正常工作。
但是,当我部署并运行管道时,它在几分钟后失败了。在状态从配置更改为运行之后不久,管道就会停止,并出现以下权限错误:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
{
"code" : 403,
"errors" : [ {
"domain" : "global",
"message" : "xxxxxxxxxxx-compute@developer.gserviceaccount.com does not have storage.buckets.create access to project X.",
"reason" : "forbidden"
} ],
"message" : "xxxxxxxxxxx-compute@developer.gserviceaccount.com does not have storage.buckets.create access to project X."
}
Run Code Online (Sandbox Code Playgroud)
xxxxxxxxxxx-compute@developer.gserviceaccount.com是我的项目的默认Compute Engine服务帐户。
不过,“ Project X”不是我的项目之一,我不知道为什么管道启动代码试图在此处创建一个存储桶,它确实在我的计算机中成功创建了临时存储桶(一个称为df-xxx和一个称为dataproc-xxx)项目失败之前。
我已经尝试使用两个单独的帐户执行此操作,并且在两个地方都遇到相同的错误。我曾尝试将存储/管理员角色添加到各种服务帐户,但无济于事,但这是在我意识到它试图完全访问另一个项目之前。
Cloud Data Fusion为每个运行的管道创建一个新的Dataproc集群。我已经有一个运行24x7的Dataproc集群设置,我想使用该集群来运行管道
我尝试从 Cloud Data Fusion 运行管道,但收到以下错误:
io.cdap.cdap.runtime.spi.provisioner.dataproc.DataprocRuntimeException: Dataproc operation failure: INVALID_ARGUMENT: User not authorized to act as service account 'XXXXXXXX-compute@developer.gserviceaccount.com'. To act as a service account, user must have one of [Owner, Editor, Service Account Actor] roles. See https://cloud.google.com/iam/docs/understanding-service-accounts for additional details.
Run Code Online (Sandbox Code Playgroud)
有人已经遇到过这个错误吗?