相关疑难解决方法(0)

如何为同等大小的分区的Spark RDD定义自定义分区程序,其中每个分区具有相同数量的元素?

我是Spark的新手.我有一个大的元素数据集[RDD],我想把它分成两个完全相同大小的分区,维护元素的顺序.我试着用RangePartitioner

var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))
Run Code Online (Sandbox Code Playgroud)

这不能给出令人满意的结果,因为它大致分割但不完全相同的大小维持元素的顺序.例如,如果有64个元素,我们使用 Rangepartitioner,然后它分为31个元素和33个元素.

我需要一个分区器,以便我在一半中获得前32个元素,而另一半包含第二组32个元素.你能否通过建议如何使用自定义分区器来帮助我,这样我可以获得相同大小的两半,保持元素的顺序?

hadoop scala apache-spark

27
推荐指数
2
解决办法
4万
查看次数

将数据导入Spark时如何设置分区/节点数

问题:我想使用以下方法将数据从S3导入Spark EMR:

data = sqlContext.read.json("s3n://.....")
Run Code Online (Sandbox Code Playgroud)

有没有办法可以设置Spark用来加载处理数据的节点数量?这是我处理数据的示例:

data.registerTempTable("table")
SqlData = sqlContext.sql("SELECT * FROM table")
Run Code Online (Sandbox Code Playgroud)

上下文:数据不是太大,需要很长时间才能加载到Spark中,也需要查询.我认为Spark将数据划分为太多节点.我希望能够手动设置.我知道在处理RDD时sc.parallelize我可以将分区数作为输入传递.此外,我已经看到了repartition(),但我不确定它是否可以解决我的问题.在我的例子中,变量data是一个DataFrame.

让我更准确地定义分区.定义一个:通常被称为"分区键",其中一列中选择和索引,以加快查询(这不是我想要的).定义二:(这是我关注的地方)假设你有一个数据集,Spark决定它将它分布在许多节点上,以便它可以并行地对数据进行操作.如果数据量太小,这可能会进一步减慢进程.我该如何设置该值

sql database-partitioning apache-spark pyspark-sql

14
推荐指数
2
解决办法
2万
查看次数

Spark知道DataFrame的分区键吗?

我想知道Spark是否知道镶木地板文件的分区键,并使用此信息来避免随机播放.

语境:

运行Spark 2.0.1运行本地SparkSession.我有一个csv数据集,我将其保存为我的磁盘上的镶木地板文件,如下所示:

val df0 = spark
  .read
  .format("csv")
  .option("header", true)
  .option("delimiter", ";")
  .option("inferSchema", false)
  .load("SomeFile.csv"))


val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)

df.write
  .mode(SaveMode.Overwrite)
  .format("parquet")
  .option("inferSchema", false)
  .save("SomeFile.parquet")
Run Code Online (Sandbox Code Playgroud)

我按列创建了42个分区numerocarte.这应该将多个组分组numerocarte到同一个分区.我write当时不想做partitionBy("numerocarte"),因为我不希望每张卡分区一个.它将是数百万.

之后在另一个脚本中,我读了这个SomeFile.parquet镶木地板文件并对其进行了一些操作.特别是我正在运行window function它,其中分区是在镶木地板文件被重新分区的同一列上完成的.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df2 = spark.read
  .format("parquet")
  .option("header", true)
  .option("inferSchema", false)
  .load("SomeFile.parquet")

val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))

df2.withColumn("NewColumnName",
      sum(col("dollars").over(w))
Run Code Online (Sandbox Code Playgroud)

read我看到repartition按预期工作后,DataFrame df2有42个分区,每个分区都有不同的卡.

问题:

  1. Spark是否知道数据框df2是按列分区的numerocarte
  2. 如果它知道,那么窗口函数中将没有随机播放.真正? …

partitioning window-functions apache-spark

14
推荐指数
2
解决办法
1万
查看次数

Spark + Parquet + Snappy:spark shuffle 数据后整体压缩率下降

社区!

请帮助我了解如何使用 Spark 获得更好的压缩率?

让我描述一下案例:

  1. 我有数据集,让我们把它的产品在其上的实木复合地板文件使用的编解码器使用Sqoop ImportTool进口HDFS瞬间。作为导入的结果,我有 100 个文件,总大小为46 GB,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB)。记录总数超过80 亿条,84 列

  2. 我也在使用snappy对 Spark 进行简单的读取/重新分区/写入,结果我得到:

~ 100 GB输出大小,具有相同的文件数、相同的编解码器、相同的数量和相同的列。

代码片段:

val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")

productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
Run Code Online (Sandbox Code Playgroud)
  1. 使用镶木地板工具,我查看了摄取和处理的随机文件,它们如下所示:

摄取:

creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) 
extra:                          parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"

and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1

row group 1:                    RC:3640100 TS:36454739 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY …
Run Code Online (Sandbox Code Playgroud)

snappy apache-spark parquet apache-spark-sql spark-dataframe

11
推荐指数
1
解决办法
1万
查看次数

同时从mysql中读取数据

我试图从mysql读取数据并将其写回s3中具有特定分区的镶木地板文件,如下所示:

df=sqlContext.read.format('jdbc')\
   .options(driver='com.mysql.jdbc.Driver',url="""jdbc:mysql://<host>:3306/<>db?user=<usr>&password=<pass>""",
         dbtable='tbl',
         numPartitions=4 )\
   .load()


df2=df.withColumn('updated_date',to_date(df.updated_at))
df2.write.parquet(path='s3n://parquet_location',mode='append',partitionBy=['updated_date'])
Run Code Online (Sandbox Code Playgroud)

我的问题是它只打开一个与mysql的连接(而不是4),并且它不会写入parquert,直到它从mysql中获取所有数据,因为我在mysql中的表很大(100M行),这个过程在OutOfMemory上失败了.

有没有办法配置Spark来打开多个与mysql的连接并将部分数据写入镶木地板?

mysql apache-spark apache-spark-sql pyspark

9
推荐指数
2
解决办法
1万
查看次数

Spark:repartition 和 repartitionByRange 有什么区别?

我在这里浏览了文档:https : //spark.apache.org/docs/latest/api/python/pyspark.sql.html

它说:

  • 用于重新分区:生成的 DataFrame 是散列分区的。
  • 对于 repartitionByRange:生成的 DataFrame 是范围分区的。

前面的问题也提到了它。但是,我仍然不明白它们究竟有何不同,以及在选择其中一个时会产生什么影响?

更重要的是,如果 repartition 进行哈希分区,提供列作为其参数有什么影响?

apache-spark apache-spark-sql pyspark

7
推荐指数
2
解决办法
2406
查看次数

Spark:重新分区与partitionBy中的列参数顺序

考虑的方法(Spark 2.2.1):

  1. DataFrame.repartition(带partitionExprs: Column*参数的两个实现)
  2. DataFrameWriter.partitionBy

注意:这个问题不会问这些方法之间的区别

文档partitionBy:

如果指定,输出奠定了类似文件系统Hive分区方案.例如,当我们Dataset按年和月分区时,目录布局如下所示:

  • 年= 2016 /月= 01 /
  • 年= 2016 /月= 02 /

由此,我推断列参数顺序将决定目录布局; 因此它是相关的.

文档repartition:

返回Dataset由给定分区表达式分区的新分区,使用spark.sql.shuffle.partitions分区数.结果Dataset散列分区.

根据我目前的理解,repartition决定处理时的并行度DataFrame.有了这个定义,行为repartition(numPartitions: Int)很简单,但是对于参数的另外两个实现也是repartition如此partitionExprs: Column*.


所有事情都说,我的疑虑如下:

  • partitionBy方法一样,输入的顺序也 …

partitioning dataframe apache-spark apache-spark-sql

6
推荐指数
1
解决办法
4808
查看次数

pyspark:重新分区后出现“太多值”错误

我有一个 DataFrame(转换为 RDD)并且想要重新分区,以便每个键(第一列)都有自己的分区。这就是我所做的:

# Repartition to # key partitions and map each row to a partition given their key rank
my_rdd = df.rdd.partitionBy(len(keys), lambda row: int(row[0]))
Run Code Online (Sandbox Code Playgroud)

但是,当我尝试将其映射回 DataFrame 或保存它时,我收到此错误:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
        process()
      File "spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py",     line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream
    for obj in iterator:
  File "spark-1.5.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1703, in add_shuffle_key
    for k, v in iterator:
ValueError: too many values to unpack …
Run Code Online (Sandbox Code Playgroud)

python apache-spark rdd apache-spark-sql pyspark

5
推荐指数
1
解决办法
3166
查看次数

Hive 分区、Spark 分区和 Spark 中的连接 - 它们之间的关系

试图了解 Hive 分区与 Spark 分区的关系,最终解决了一个关于连接的问题。

我有 2 个外部 Hive 表;均由 S3 存储桶支持并由 分区date;所以在每个存储桶中都有名称为 format 的键date=<yyyy-MM-dd>/<filename>

问题 1:

如果我将此数据读入 Spark:

val table1 = spark.table("table1").as[Table1Row]
val table2 = spark.table("table2").as[Table2Row]
Run Code Online (Sandbox Code Playgroud)

那么结果数据集将分别有多少个分区?分区等于 S3 中的对象数量?

问题2

假设这两种行类型具有以下架构:

Table1Row(date: Date, id: String, ...)
Table2Row(date: Date, id: String, ...)
Run Code Online (Sandbox Code Playgroud)

并且我想加入table1table2在领域dateid

table1.joinWith(table2,
  table1("date") === table2("date") && 
    table1("id") === table2("id")
)
Run Code Online (Sandbox Code Playgroud)

Spark 是否能够利用被连接的字段之一是 Hive 表中的分区键来优化连接?如果是这样怎么办?

问题 3

假设现在我正在使用RDDs 代替:

val rdd1 = table1.rdd …
Run Code Online (Sandbox Code Playgroud)

hive apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
3372
查看次数

如何在PySpark DataFrame中强制进行某个分区?

假设我有一个带有列的DataFrame partition_id:

n_partitions = 2

df = spark.sparkContext.parallelize([
    [1, 'A'],
    [1, 'B'],
    [2, 'A'],
    [2, 'C']
]).toDF(('partition_id', 'val'))
Run Code Online (Sandbox Code Playgroud)

我如何重新分区DataFrame以保证每个值partition_id都转到一个单独的分区,并且实际分区的数量与不同的值完全相同partition_id

如果我执行散列分区,即df.repartition(n_partitions, 'partition_id')保证分区数量正确,但某些分区可能为空,而其他分区可能包含多个partition_id由于散列冲突引起的值.

partitioning apache-spark pyspark

4
推荐指数
2
解决办法
2596
查看次数

写入时如何控制Spark作业创建的输出零件文件的数量?

嗨,我有几个Spark作业,每天处理数千个文件。文件大小可能从MB到GB。完成工作后,我通常使用以下代码保存

finalJavaRDD.saveAsParquetFile("/path/in/hdfs"); OR
dataFrame.write.format("orc").save("/path/in/hdfs") //storing as ORC file as of Spark 1.4
Run Code Online (Sandbox Code Playgroud)

Spark作业会在最终输出目录中创建大量小零件文件。据我了解,Spark为每个分区/任务创建零件文件,如果我错了,请纠正我。我们如何控制Spark创建的零件文件数量?最后,我想使用这些parquet / orc目录创建Hive表,并且听说在没有大量小文件的情况下Hive运行缓慢。请指导我是Spark的新手。提前致谢。

hive apache-spark parquet apache-spark-sql

2
推荐指数
1
解决办法
9172
查看次数