Sri*_*ari 4 python gcloud google-cloud-dataproc
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import gc
import pandas as pd
import datetime
import numpy as np
import sys
APP_NAME = "DataFrameToCSV"
spark = SparkSession\
.builder\
.appName(APP_NAME)\
.config("spark.sql.crossJoin.enabled","true")\
.getOrCreate()
group_ids = [1,1,1,1,1,1,1,2,2,2,2,2,2,2]
dates = ["2016-04-01","2016-04-01","2016-04-01","2016-04-20","2016-04-20","2016-04-28","2016-04-28","2016-04-05","2016-04-05","2016-04-05","2016-04-05","2016-04-20","2016-04-20","2016-04-29"]
#event = [0,1,0,0,0,0,1,1,0,0,0,0,1,0]
event = [0,1,1,0,1,0,1,0,0,1,0,0,0,0]
dataFrameArr = np.column_stack((group_ids,dates,event))
df = pd.DataFrame(dataFrameArr,columns = ["group_ids","dates","event"])
Run Code Online (Sandbox Code Playgroud)
上面的 python 代码将在 gcloud dataproc 上的 spark 集群上运行。我想在 gs://mybucket/csv_data/ 的 gcloud 存储桶中将 Pandas 数据帧保存为 csv 文件
我该怎么做呢?
您也可以将此解决方案与 Dask 一起使用。您可以将 DataFrame 转换为 Dask DataFrame,后者可以写入 Cloud Storage 上的 csv
import dask.dataframe as dd
import pandas
df # your Pandas DataFrame
ddf = dd.from_pandas(df,npartitions=1, sort=True)
ddf.to_csv('gs://YOUR_BUCKET/ddf-*.csv', index=False, sep=',', header=False,
storage_options={'token': gcs.session.credentials})
Run Code Online (Sandbox Code Playgroud)
storage_options 参数是可选的
| 归档时间: |
|
| 查看次数: |
3286 次 |
| 最近记录: |