我是 spark 新手,正在尝试使用 df.write 加快将数据帧的内容(可能有 200k 到 2M 行)附加到 postgres 数据库的速度:
df.write.format('jdbc').options(
url=psql_url_spark,
driver=spark_env['PSQL_DRIVER'],
dbtable="{schema}.{table}".format(schema=schema, table=table),
user=spark_env['PSQL_USER'],
password=spark_env['PSQL_PASS'],
batchsize=2000000,
queryTimeout=690
).mode(mode).save()
Run Code Online (Sandbox Code Playgroud)
我尝试增加批量,但这没有帮助,因为完成这项任务仍然需要大约 4 小时。我还在下面包含了来自 aws emr 的一些快照,显示了有关作业运行方式的更多详细信息。将数据帧保存到 postgres 表的任务只分配给了一个执行器(我觉得很奇怪),加速这个任务是否涉及在执行器之间分配这个任务?
另外,我已经阅读了spark 的性能调优文档,但增加了batchsize, 并且queryTimeout似乎没有提高性能。(我之前尝试调用df.cache()我的脚本df.write,但脚本的运行时间仍然是 4 小时)
此外,我的 aws emr 硬件设置spark-submit是:
主节点(1):m4.xlarge
核心节点(2):m5.xlarge
spark-submit --deploy-mode client --executor-cores 4 --num-executors 4 ...
Run Code Online (Sandbox Code Playgroud)
postgresql apache-spark apache-spark-sql pyspark pyspark-sql
我正在将 Spark 数据帧写入 bigquery 表。这是可行的,但现在我在将数据写入 bigquery 之前调用 pandas udf。由于某种原因,当我在将 Spark 数据帧写入 bigquery 之前调用 pandas udf 时,我现在看到以下错误:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
batch = _create_batch(series, self._timezone)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
arrs = [create_array(s, t) for s, t in series] …Run Code Online (Sandbox Code Playgroud) 我已经为我的 bigquery 项目创建了一个client idand client secret,但我不知道如何使用它们将数据帧从 pyspark 脚本成功保存到我的 bigquery 表中。我的下面的 python 代码会导致以下错误。有没有办法使用 pyspark 数据帧上的保存选项连接到 BigQuery?
代码
df.write \
.format("bigquery") \
.option("client_id", "<MY_CLIENT_ID>") \
.option("client_secret", "<MY_CLIENT_SECRET>") \
.option("project", "bigquery-project-id") \
.option("table", "dataset.table") \
.save()
Run Code Online (Sandbox Code Playgroud)
错误
py4j.protocol.Py4JJavaError:调用 o93.save 时发生错误。:com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException:400错误请求{“error”:“invalid_grant”,“error_description”:“错误请求”}位于com.google.cloud .spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:106) 位于 com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery。 spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:268) 位于 com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$17.call(BigQueryImpl.java:664) 位于 com.google .cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$17.call(BigQueryImpl.java:661) 在 com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying .DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105) 位于 com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.run(RetryHelper.java:76) 位于 com.google.cloud.spark.bigquery .repackaged.com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50) 在 com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getTable(BigQueryImpl.java:660)在 com.google.cloud.spark.bigquery.BigQueryInsertableRelation.getTable(BigQueryInsertableRelation.scala:68) 在 com.google.cloud.spark.bigquery.BigQueryInsertableRelation.exists(BigQueryInsertableRelation.scala:54) 在 com.google.cloud.spark .bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:86) 在 org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec .sideEffectResult$lzycompute(commands.scala:70) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute (commands.scala:86) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$在 org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) …
我已经在Google云上创建了一个mysql数据库,我想从一个单独的节点Web应用程序(也在Google Cloud上运行)访问该数据库。我首先在计算机上本地测试连接,然后在本地运行以下代码时,我可以成功建立与数据库的连接并查看其中的数据。
'use strict';
// [START app]
const express = require('express');
const bodyParser = require('body-parser');
const path = require('path');
const app = express();
const mysql = require('mysql');
var connection = mysql.createConnection({
host : 'Cloud SQL IP',
user : 'username',
password : 'password',
database : 'db_name'
});
// parse application/x-www-form-urlencoded
app.use(bodyParser.urlencoded({ extended: false }))
// parse application/json
app.use(bodyParser.json())
// Make globals.js accessible
app.use(express.static(__dirname + '/'));
app.get('/', (req, res) => {
connection.connect();
connection.query('SELECT * FROM Users', function (error, results, fields) {
if …Run Code Online (Sandbox Code Playgroud) 我正在尝试将 Spark 数据帧从 AWS EMR 集群保存到 BigQuery 表中。我正在使用Spark-bigquery-connector来执行此操作。我已从命令行将 gcloud 凭据服务 json 文件编码为 Base64,然后只需粘贴选项字符串credentials。但这不起作用,并会导致下面的编码错误。我知道我的 json 文件是正确的,因为我在本地运行脚本时使用它。是什么导致了这个问题?
GCLOUD 服务凭证 JSON 文件结构
{
"type": "service_account",
"project_id": "<MY_PROJECT_NAME>",
"private_key_id": "<PRIVATE_KEY_ID>",
"private_key": "-----BEGIN PRIVATE KEY-----<LONG LIST OF CHARS>-----END PRIVATE KEY-----\n",
"client_email": "service@project.iam.gserviceaccount.com",
"client_id": "<CLIENT_ID>",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/<service>%40<project>.iam.gserviceaccount.com"
}
Run Code Online (Sandbox Code Playgroud)
火花代码
df \
.drop(*cols_to_drop) \
.write \
.format("bigquery") \
.option("temporaryGcsBucket", "emr_spark") \
.option("credentials", "<LONG_BASE64_STRING>") \
.option("project", "<MY_PROJECT_NAME>") \
.option("parentProject", "<MY_PROJECT_NAME>") \
.option("table", "<MY_PROJECT_NAME>:dataset.table") \
.mode("overwrite") \
.save()
Run Code Online (Sandbox Code Playgroud)
错误: …
我使用Microsoft的Bot Framework创建了一个聊天机器人,并使用Bot Emulator进行了测试.现在我想将这个聊天机器人集成到我的网站上,该网站也托管在本地服务器上.我在dev.botframework.com上注册了这个机器人.
apache-spark ×4
pyspark ×4
asp.net ×1
botframework ×1
gcloud ×1
mysql ×1
node.js ×1
postgresql ×1
pyspark-sql ×1