使用pyspark脚本从bigquery加载表到spark集群

swe*_*eet 4 python google-bigquery apache-spark pyspark google-cloud-dataproc

我有一个在bigquery中加载的数据表,我想通过pyspark .py文件在我的spark集群中导入它.

我在Dataproc + BigQuery示例中看到过- 任何可用的?有一种方法可以使用scala在spark集群中加载一个bigquery表,但有没有办法在pyspark脚本中执行此操作?

Jam*_*mes 5

这个问题来自@MattJ .这是一个连接到Spark中的BigQuery并执行字数统计的示例.

import json
import pyspark
sc = pyspark.SparkContext()

hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")

conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>",
    "mapred.bq.input.project.id": "publicdata", 
    "mapred.bq.input.dataset.id":"samples", 
    "mapred.bq.input.table.id": "shakespeare"  }

tableData = sc.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
    "org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject", 
    conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"],
    int(x["word_count"]))).reduceByKey(lambda x,y: x+y)

print tableData.take(10)
Run Code Online (Sandbox Code Playgroud)

您需要更改<project_id><bucket>匹配项目的设置.