小编Pau*_*vis的帖子

如何将多处理池分配给 Spark Worker

我正在尝试使用多重处理并行读取 100 个 csv 文件(然后分别并行处理它们)。以下是我在 AWS 中的 EMR 主节点上托管的 Jupyter 中运行的代码。(最终将是 100k csv 文件,因此需要分布式读取)。

import findspark
import boto3
from multiprocessing.pool import ThreadPool
import logging
import sys
findspark.init()
from pyspark import SparkContext, SparkConf, sql
conf = SparkConf().setMaster("local[*]")
conf.set('spark.scheduler.mode', 'FAIR')
sc = SparkContext.getOrCreate(conf)
spark = sql.SparkSession.builder.master("local[*]").appName("ETL").getOrCreate()
s3 = boto3.resource(...)
bucket = ''
bucketObj = s3.Bucket(bucket)
numNodes = 64
def processTest(key):
    logger.info(key + ' ---- Start\n')
    fLog = spark.read.option("header", "true") \
                         .option("inferSchema", "true") \
                         .csv(buildS3Path(bucket) + key)
    logger.info(key + ' ---- Finish Read\n') …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing apache-spark pyspark

5
推荐指数
1
解决办法
4887
查看次数

Pandas加入String数据类型

我想在id字段上加入两个pandas数据帧,这是一个字符串uuid.我收到一个Value错误:

ValueError:您正在尝试合并object和int64列.如果您想继续,请使用pd.concat

代码如下.我试图将字段转换为字符串按照尝试合并2个数据帧但得到ValueError但错误仍然存​​在.请注意,pdf来自火花,dataframe.toPandas()而outputsPdf是从字典创建的.

pdf.id = pdf.id.apply(str)
outputsPdf.id = outputsPdf.id.apply(str)
inOutPdf = pdf.join(outputsPdf, on='id', how='left', rsuffix='fs')

pdf.dtypes
id         object
time      float64
height    float32
dtype: object

outputsPdf.dtypes
id         object
labels    float64
dtype: object
Run Code Online (Sandbox Code Playgroud)

我该怎么调试呢?完全追溯:

ValueError                                Traceback (most recent call last)
<ipython-input-13-deb429dde9ad> in <module>()
     61 pdf['id'] = pdf['id'].astype(str)
     62 outputsPdf['id'] = outputsPdf['id'].astype(str)
---> 63 inOutPdf = pdf.join(outputsPdf, on=['id'], how='left', rsuffix='fs')
     64 
     65 # idSparkDf = spark.createDataFrame(idPandasDf, schema=StructType([StructField('id', StringType(), True),

~/miniconda3/lib/python3.6/site-packages/pandas/core/frame.py in join(self, other, on, how, lsuffix, rsuffix, sort)
   6334 …
Run Code Online (Sandbox Code Playgroud)

python pandas

4
推荐指数
1
解决办法
1315
查看次数

从S3存储桶读取文件到PySpark Dataframe Boto3

如何将S3存储桶中的一堆文件加载到单个PySpark数据帧中?我在EMR实例上运行。如果文件是本地文件,则可以使用SparkContext textFile方法。但是,当文件位于S3上时,如何使用boto3将多个类型(CSV,JSON等)的多个文件加载到单个数据框中进行处理?

amazon-s3 apache-spark boto3 pyspark

1
推荐指数
1
解决办法
5339
查看次数

BigQuery - 在同一查询的另一部分中引用子查询的结果

我经常在我的查询中遇到以下问题,我在这个问题中构建了一个大型复杂的子查询,我想在同一个查询中引用多次(但可能会稍微过滤一下):

Select * FROM (BIG SUB QUERY WHERE field='one') as a
INNER JOIN 
(SAME BIG SUB QUERY WHERE field = 'two') as b
ON a.id = b.id
Run Code Online (Sandbox Code Playgroud)

我想避免使用临时表,因为我正在使用BigQuery并且只想执行一个查询(为了速度).有没有办法保持子查询并在别处引用它们?如果我复制并粘贴BIG SUB QUERY文本,Big Query是否会被优化以有效地运行几乎相同的SUB QUERIES?

sql google-bigquery

0
推荐指数
1
解决办法
1258
查看次数