Mat*_*hew 5 java dataflow google-cloud-platform google-cloud-pubsub
我正在尝试使用此示例从 DataFlow 的 GCP Pub/Sub 检索数据。
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import avro.shaded.com.google.common.collect.Lists;
import com.google.auth.oauth2.GoogleCredentials;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
public class StreamDemoConsumer {
public static interface MyOptions extends DataflowPipelineOptions {
@Description("Output BigQuery table <project_id>:<dataset_id>.<table_id>")
@Default.String("coexon-seoul-dev:ledger_data_set.ledger_data2")
String getOutput();
void setOutput(String s);
@Description("Input topic")
@Default.String("projects/coexon-seoul-dev/topics/trading")
String getInput();
void setInput(String s);
}
@SuppressWarnings("serial")
public static void main(String[] args) throws IOException {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
String topic = options.getInput();
String output = options.getOutput();
// Build the table schema for the output table.
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("num_words").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
p //
.apply("GetMessages", PubsubIO.readStrings().fromTopic(topic)) //
.apply("window",
Window.into(SlidingWindows//
.of(Duration.standardMinutes(2))//
.every(Duration.standardSeconds(30)))) //
.apply("WordsPerLine", ParDo.of(new DoFn<String, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String line = c.element();
c.output(line.split(" ").length);
}
}))//
.apply("WordsInTimeWindow", Sum.integersGlobally().withoutDefaults()) //
.apply("ToBQRow", ParDo.of(new DoFn<Integer, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = new TableRow();
row.set("timestamp", Instant.now().toString());
row.set("num_words", c.element());
c.output(row);
}
})) //
.apply(BigQueryIO.writeTableRows().to(output)//
.withSchema(schema)//
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
}
}
Run Code Online (Sandbox Code Playgroud)
我使用以下命令运行此代码。
sh run_oncloud4.sh coexon-seoul-dev ledgerbucket
Run Code Online (Sandbox Code Playgroud)
然后代码运行良好
run_oncloud4.sh 如下
#!/bin/bash
if [ "$#" -ne 2 ]; then
echo "Usage: ./run_oncloud.sh project-name bucket-name"
echo "Example: ./run_oncloud.sh cloud-training-demos cloud-training-demos"
exit
fi
PROJECT=$1
BUCKET=$2
MAIN=com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer
echo "project=$PROJECT bucket=$BUCKET main=$MAIN"
export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH
mvn compile -e exec:java \
-Dexec.mainClass=$MAIN \
-Dexec.args="--project=$PROJECT \
--stagingLocation=gs://$BUCKET/staging/ \
--tempLocation=gs://$BUCKET/staging/ \
--output=$PROJECT:demos.streamdemo \
--input=projects/$PROJECT/topics/streamdemo \
--runner=DataflowRunner"
Run Code Online (Sandbox Code Playgroud)
但我运行上面的代码如下
sh run_locally.sh com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer
Run Code Online (Sandbox Code Playgroud)
然后出现无法获取应用程序默认凭据错误消息。
>SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>SLF4J: Defaulting to no-operation (NOP) logger implementation
>SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
>Exception in thread "main" java.lang.RuntimeException: Unable to get application default credentials. Please see https://developers.google.com/accounts/docs/application-default-credentials for details on how to specify credentials. This version of the SDK is dependent on the gcloud core component version 2015.02.05 or newer to be able to get credentials from the currently authorized user via gcloud auth.
> at org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer.throwNullCredentialException(NullCredentialInitializer.java:60)
> at org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer$NullCredentialHttpUnsuccessfulResponseHandler.handleResponse(NullCredentialInitializer.java:53)
> at com.google.cloud.hadoop.util.ChainingHttpRequestInitializer$3.handleResponse(ChainingHttpRequestInitializer.java:111)
> at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1015)
> at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
> at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
> at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
> at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.executeWithRetries(BigQueryServicesImpl.java:854)
> at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.getDataset(BigQueryServicesImpl.java:554)
> at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.verifyDatasetPresence(BigQueryHelpers.java:196)
> at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.validate(BigQueryIO.java:1486)
> at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:640)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:656)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> at com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer.main(StreamDemoConsumer.java:115)
>
>Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)
本地运行.sh
>SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>SLF4J: Defaulting to no-operation (NOP) logger implementation
>SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
>Exception in thread "main" java.lang.RuntimeException: Unable to get application default credentials. Please see https://developers.google.com/accounts/docs/application-default-credentials for details on how to specify credentials. This version of the SDK is dependent on the gcloud core component version 2015.02.05 or newer to be able to get credentials from the currently authorized user via gcloud auth.
> at org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer.throwNullCredentialException(NullCredentialInitializer.java:60)
> at org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer$NullCredentialHttpUnsuccessfulResponseHandler.handleResponse(NullCredentialInitializer.java:53)
> at com.google.cloud.hadoop.util.ChainingHttpRequestInitializer$3.handleResponse(ChainingHttpRequestInitializer.java:111)
> at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1015)
> at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
> at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
> at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
> at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.executeWithRetries(BigQueryServicesImpl.java:854)
> at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.getDataset(BigQueryServicesImpl.java:554)
> at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.verifyDatasetPresence(BigQueryHelpers.java:196)
> at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.validate(BigQueryIO.java:1486)
> at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:640)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:656)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> at com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer.main(StreamDemoConsumer.java:115)
>
>Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)
我已经设置了凭证
#!/bin/bash
if [ "$#" -ne 1 ]; then
echo "Usage: ./run_locally.sh mainclass-basename"
echo "Example: ./run_oncloud.sh Grep"
exit
fi
MAIN=com.google.cloud.training.dataanalyst.javahelp.$1
export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH
mvn compile -e exec:java -Dexec.mainClass=$MAIN
Run Code Online (Sandbox Code Playgroud)
/用户/mattheu/coexon-seoul-dev-898d91a66539.json
但出现授权错误。
我怎么解决这个问题?
对于我们来说,这有效:
% gcloud auth application-default login
Run Code Online (Sandbox Code Playgroud)
来自文档:
当您开发的代码通常使用服务帐户但需要在本地开发环境中运行代码(在本地开发环境中更容易提供用户凭据)时,此命令非常有用。
无需设置GOOGLE_APPLICATION_CREDENTIALS。引用最佳实践:
开发期间不要使用服务帐户。在日常工作中,您可能会使用 Google Cloud CLI、gsutil 或 terraform 等工具。不要使用服务帐户来运行这些工具。相反,让他们首先运行 gcloud auth login(对于 gcloud CLI 和 gsutil)或 gcloud auth application-default login(对于 terraform 和其他第三方工具)来使用您的凭据。
我经历过类似的事情,以下步骤对我有用:
在您的笔记本电脑上安装 Google Cloud SDK。此处的说明: https: //cloud.google.com/sdk/install
关闭命令行并重新打开它。
运行gcloud init并按照说明进行操作,其中包括将 SDK 绑定到您的 GCP 帐户和项目。
按照手动设置服务帐户的说明进行操作 ( https://cloud.google.com/docs/authentication/Production#obtaining_and_providing_service_account_credentials_manually )。您只需按照“手动获取并提供服务帐户凭据”下的说明进行操作即可。基本上,您将使用您的服务帐户信息将文件保存到您的计算机上,该文件将用于您尝试执行的工作。
在您的 shell 配置文件中(在以 Catalina 开头的 macOS 上,这是~/.zshenv),添加以下行export GOOGLE_APPLICATION_CREDENTIALS="/path/to/the/file/you/saved/in/step/4"
关闭并重新打开你的外壳,你应该可以开始了。
我有点不确定是否有必要设置 SDK(步骤 1-3),但无论如何进行该设置都很好。
| 归档时间: |
|
| 查看次数: |
6510 次 |
| 最近记录: |