我正在 PySpark 中迈出第一步,目前我正在研究 UDF 和 pandas UDF。我读过几个论坛,他们或多或少都同意“pandas UDF允许矢量化操作,与一次一行的 Python UDF 相比,可以将性能提高高达 100 倍”。因此,pandas UDF 成为一个有趣的主题。
\n对于我的测试,我有以下虚拟数据:
\nimport pandas as pd\nimport time\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.functions import udf, col, pandas_udf\nfrom pyspark.sql.types import ArrayType, StringType\n\nspark = SparkSession.builder \\\n .appName(\'SpacyOverPySpark\') \\\n .config(\'spark.jars.packages\', \'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2\') \\\n .getOrCreate()\n\n# - Running in a GCP Dataproc Workbench Jupyter Notebook\n# - Data being imported from GCP Cloud Storage\n# - They\'re basically text paragraphs or sentences, no nulls\ndf = spark.read.csv("gs://my_bucket/data_sample.csv", header=True)\n\nprint("DataFrame shape: ", (df.count(), …Run Code Online (Sandbox Code Playgroud) 我logs.txt在Compute Engine VM Instance 的某个位置有一个文件。我想定期在Google Cloud Storage 存储桶中备份(即覆盖)。由于是在 Python 脚本内进行一些预处理的结果,我还想使用该脚本将该文件上传/复制到 Google Cloud Storage 存储桶中(因此,不能将其视为一个选项)。Compute Engine 虚拟机实例和 Cloud Storage 存储桶都位于同一个 GCP 项目中,因此“它们可以看到彼此”。基于此示例代码,我现在正在尝试的内容如下所示:logs.txtlogs.txtcp
from google.cloud import storage
bucket_name = "my-bucket"
destination_blob_name = "logs.txt"
source_file_name = "logs.txt" # accessible from this script
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
generation_match_precondition = 0
blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
Run Code Online (Sandbox Code Playgroud)
如果gs://my-bucket/logs.txt不存在,脚本可以正常工作,但是如果我尝试覆盖,则会出现以下错误:
Traceback (most …Run Code Online (Sandbox Code Playgroud) python google-cloud-storage google-compute-engine google-cloud-platform
我想知道将 Google BigQuery 表查询结果存储到 Google Cloud 存储的最佳方式是什么。我的代码目前正在一些 Jupyter Notebook 中运行(在 Vertex AI Workbench 中,与 BigQuery 数据源以及 Cloud Storage 目标相同的项目),如下所示:
# CELL 1 OF 2
from google.cloud import bigquery
bqclient = bigquery.Client()
# The query string can vary:
query_string = """
SELECT *
FROM `my_project-name.my_db.my_table`
LIMIT 2000000
"""
dataframe = (
bqclient.query(query_string)
.result()
.to_dataframe(
create_bqstorage_client=True,
)
)
print("Dataframe shape: ", dataframe.shape)
# CELL 2 OF 2:
import pandas as pd
dataframe.to_csv('gs://my_bucket/test_file.csv', index=False)
Run Code Online (Sandbox Code Playgroud)
此代码大约需要 7.5 分钟才能成功完成。
是否有更优化的方法来实现上面所做的事情?(这意味着更快,但也许其他方面还可以改进)。
一些附加说明:
google-cloud-storage google-bigquery google-cloud-platform google-cloud-api-gateway google-cloud-vertex-ai