我在我的本地计算机上写了一个火花作业,它使用google hadoop连接器从谷歌云存储中读取文件,如gs://storage.googleapis.com/,如https://cloud.google.com/dataproc/docs/中所述连接器/云的存储
我已经设置了具有计算引擎和存储权限的服务帐户.我的火花配置和代码是
SparkConf conf = new SparkConf();
conf.setAppName("SparkAPp").setMaster("local");
conf.set("google.cloud.auth.service.account.enable", "true");
conf.set("google.cloud.auth.service.account.email", "xxx-compute@developer.gserviceaccount.com");
conf.set("google.cloud.auth.service.account.keyfile", "/root/Documents/xxx-compute-e71ddbafd13e.p12");
conf.set("fs.gs.project.id", "xxx-990711");
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
SparkContext sparkContext = new SparkContext(conf);
JavaRDD<String> data = sparkContext.textFile("gs://storage.googleapis.com/xxx/xxx.txt", 0).toJavaRDD();
data.foreach(line -> System.out.println(line));
Run Code Online (Sandbox Code Playgroud)
我已经设置了名为GOOGLE_APPLICATION_CREDENTIALS的环境变量,该变量指向密钥文件.我尝试过使用两个密钥文件,即json和P12.但无法访问该文件.我得到的错误是
java.net.UnknownHostException: metadata
java.io.IOException: Error getting access token from metadata server at: http://metadata/computeMetadata/v1/instance/service-accounts/default/token
at com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:208)
at com.google.cloud.hadoop.util.CredentialConfiguration.getCredential(CredentialConfiguration.java:70)
Run Code Online (Sandbox Code Playgroud)
我正在使用java 8,spark 2.2.0依赖项和gcs-connector 1.6.1.hadoop2从eclipse运行我的工作.我只需要使用服务帐户连接,而不是通过OAuth机制连接.
提前致谢
java google-cloud-storage service-accounts apache-spark google-cloud-dataproc
Airflow 中的 Dataproc Spark 运算符如何返回值以及如何捕获该值。
我有一个下游作业来捕获此结果,并根据返回的值,我必须通过分支操作员触发另一个作业。
apache-spark google-cloud-dataproc airflow google-cloud-composer
我有一个 Google Cloud 的 JSON 密钥文件,格式如下:
{
"type": "service_account",
"project_id": "###",
"private_key_id": "###",
"private_key": "-----BEGIN PRIVATE KEY-----\n
########################################
\n-----END PRIVATE KEY-----\n",
"client_email": "###@###.gserviceaccount.com",
"client_id": "###",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/###.gserviceaccount.com"
}
Run Code Online (Sandbox Code Playgroud)
我想access_token
使用常规 curl 命令而不是GCP Console或安装gcloud 工具来获得一段时间。
我希望是这样的:
{
"type": "service_account",
"project_id": "###",
"private_key_id": "###",
"private_key": "-----BEGIN PRIVATE KEY-----\n
########################################
\n-----END PRIVATE KEY-----\n",
"client_email": "###@###.gserviceaccount.com",
"client_id": "###",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/###.gserviceaccount.com"
}
Run Code Online (Sandbox Code Playgroud) 出于某种原因,我想安装与Google Cloud Dataproc上提供的版本不同的Apache Spark版本。如何安装 Spark 的自定义版本,同时保持与 Cloud Dataproc 工具的兼容性?
我正在尝试使用 .NET Spark 作业提交 Dataproc。
命令行如下所示:
gcloud dataproc jobs submit spark \
--cluster=<cluster> \
--region=<region> \
--class=org.apache.spark.deploy.dotnet.DotnetRunner \
--jars=gs://bucket/microsoft-spark-2.4.x-0.11.0.jar \
--archives=gs://bucket/dotnet-build-output.zip \
-- find
Run Code Online (Sandbox Code Playgroud)
该命令行应调用find
函数来显示当前目录中的文件。
我只看到 2 个文件:
././microsoft-spark-2.4.x-0.11.0.jar
././microsoft-spark-2.4.x-0.11.0.jar.crc
Run Code Online (Sandbox Code Playgroud)
最终,GCP 不会从指定为 的存储中解压文件--archives
。指定的文件存在,并且路径是从 GCP UI 复制的。我还尝试从存档(存在)中运行一个精确的程序集文件,但它合理地失败了File does not exist
.net apache-spark google-cloud-platform google-cloud-dataproc
完成所有作业后,如何以编程方式自动关闭Google Dataproc集群?
Dataproc 提供创建,监视和管理。但是看来我找不到如何删除群集。
我们目前正在测试基于Spark在Python中实现LDA的预测引擎: https://spark.apache.org/docs/2.2.0/ml-clustering.html#latent-dirichlet-allocation-lda https:// spark .apache.org/docs/2.2.0/api/python/pyspark.ml.html #pyspark.ml.clustering.LDA (我们使用的是pyspark.ml包,而不是pyspark.mllib)
我们能够成功地在Spark集群上训练模型(使用Google Cloud Dataproc).现在我们尝试使用该模型作为API(例如烧瓶应用程序)提供实时预测.
实现目标的最佳方法是什么?
我们的主要痛点是,我们似乎需要恢复整个Spark环境,以便加载训练好的模型并运行转换.到目前为止,我们已尝试为每个收到的请求以本地模式运行Spark,但这种方法给了我们:
整个方法看起来很重,有没有更简单的替代方案,甚至根本不需要暗示Spark的方法?
波纹管是训练和预测步骤的简化代码.
def train(input_dataset):
conf = pyspark.SparkConf().setAppName("lda-train")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Generate count vectors
count_vectorizer = CountVectorizer(...)
vectorizer_model = count_vectorizer.fit(input_dataset)
vectorized_dataset = vectorizer_model.transform(input_dataset)
# Instantiate LDA model
lda = LDA(k=100, maxIter=100, optimizer="em", ...)
# Train LDA model
lda_model = lda.fit(vectorized_dataset)
# Save models to external storage
vectorizer_model.write().overwrite().save("gs://...")
lda_model.write().overwrite().save("gs://...")
Run Code Online (Sandbox Code Playgroud)
def predict(input_query):
conf = pyspark.SparkConf().setAppName("lda-predict").setMaster("local")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Load models from external storage
vectorizer_model …
Run Code Online (Sandbox Code Playgroud) 我在 Google Compute Engine 上创建了两个集群,这些集群读取了 100 GB 的数据。
集群 I:1 个主 - 15 GB 内存 - 250 GB 磁盘 10 个节点 - 7.5 GB 内存 - 200 GB 磁盘
集群 II:1 个主 - 15 GB 内存 - 250 GB 磁盘 150 个节点 - 1.7 GB 内存 - 200 GB 磁盘
我正在使用它来读取文件:
val df = spark.read.format("csv")
.option("inferSchema", true)
.option("maxColumns",900000)
.load("hdfs://master:9000/tmp/test.csv")
Run Code Online (Sandbox Code Playgroud)
这也是一个包含 55k 行和 850k 列的数据集。
Q1:虽然我增加了机器数量,但我没有看到阅读速度有显着提高。有什么问题或怎么做才能使这个过程更快?我应该更多地增加节点吗?
Q2:机器数量的增加对更快更重要还是内存量的增加对Spark很重要?节点、内存和速度之间是否有性能图?
Q3:hadoop 的复制或移动命令也运行得很慢。数据只有 100 GB。大公司如何处理 TB 级数据?我无法捕捉到数据读取速度的增加。
谢谢你的回答
我有一个在 Dataproc 集群上运行的 Spark 作业。如何配置环境以使用 IDE 在本地计算机上进行调试?
我在从SAS声明Java对象时遇到麻烦。无法设置Java对象的参数数组或创建数组的对象。
data _NULL_;
/*With primitive types work well */
declare javaobj jArr("java.util.Arrays");
array primeNum{3} (15, 2, 3);
jArr_rc1=jArr.callStaticVoidMethod("sort", primeNum);
/* For example, try to create array from some javaobjs */
declare javaobj num1("java.lang.Integer","15");
declare javaobj num2("java.lang.Integer","2");
declare javaobj num3("java.lang.Integer","3");
array Num{3} (num1,num2,num3);
/* ERROR: Cannot create an array of objects. */
/* ERROR: DATA STEP Component Object failure. Aborted during the COMPILATION phase. */
/*In my project I've tried different ways: */
/* ...callStaticVoidMethod("FuncFromStandartClass",[num1,num2,num3]); */
/* ...callStaticVoidMethod("FuncFromStandartClass",{num1,num2,num3}); */
/* This …
Run Code Online (Sandbox Code Playgroud)