我是Spark的新手.我有一个大的元素数据集[RDD],我想把它分成两个完全相同大小的分区,维护元素的顺序.我试着用RangePartitioner像
var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))
Run Code Online (Sandbox Code Playgroud)
这不能给出令人满意的结果,因为它大致分割但不完全相同的大小维持元素的顺序.例如,如果有64个元素,我们使用
Rangepartitioner,然后它分为31个元素和33个元素.
我需要一个分区器,以便我在一半中获得前32个元素,而另一半包含第二组32个元素.你能否通过建议如何使用自定义分区器来帮助我,这样我可以获得相同大小的两半,保持元素的顺序?
问题:我想使用以下方法将数据从S3导入Spark EMR:
data = sqlContext.read.json("s3n://.....")
Run Code Online (Sandbox Code Playgroud)
有没有办法可以设置Spark用来加载和处理数据的节点数量?这是我处理数据的示例:
data.registerTempTable("table")
SqlData = sqlContext.sql("SELECT * FROM table")
Run Code Online (Sandbox Code Playgroud)
上下文:数据不是太大,需要很长时间才能加载到Spark中,也需要查询.我认为Spark将数据划分为太多节点.我希望能够手动设置.我知道在处理RDD时sc.parallelize我可以将分区数作为输入传递.此外,我已经看到了repartition(),但我不确定它是否可以解决我的问题.在我的例子中,变量data是一个DataFrame.
让我更准确地定义分区.定义一个:通常被称为"分区键",其中一列中选择和索引,以加快查询(这不是我想要的).定义二:(这是我关注的地方)假设你有一个数据集,Spark决定它将它分布在许多节点上,以便它可以并行地对数据进行操作.如果数据量太小,这可能会进一步减慢进程.我该如何设置该值
我想知道Spark是否知道镶木地板文件的分区键,并使用此信息来避免随机播放.
语境:
运行Spark 2.0.1运行本地SparkSession.我有一个csv数据集,我将其保存为我的磁盘上的镶木地板文件,如下所示:
val df0 = spark
.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("inferSchema", false)
.load("SomeFile.csv"))
val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)
df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("SomeFile.parquet")
Run Code Online (Sandbox Code Playgroud)
我按列创建了42个分区numerocarte.这应该将多个组分组numerocarte到同一个分区.我write当时不想做partitionBy("numerocarte"),因为我不希望每张卡分区一个.它将是数百万.
之后在另一个脚本中,我读了这个SomeFile.parquet镶木地板文件并对其进行了一些操作.特别是我正在运行window function它,其中分区是在镶木地板文件被重新分区的同一列上完成的.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df2 = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("SomeFile.parquet")
val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))
df2.withColumn("NewColumnName",
sum(col("dollars").over(w))
Run Code Online (Sandbox Code Playgroud)
在read我看到repartition按预期工作后,DataFrame df2有42个分区,每个分区都有不同的卡.
问题:
df2是按列分区的numerocarte?社区!
请帮助我了解如何使用 Spark 获得更好的压缩率?
让我描述一下案例:
我有数据集,让我们把它的产品在其上的实木复合地板文件使用的编解码器使用Sqoop ImportTool进口HDFS瞬间。作为导入的结果,我有 100 个文件,总大小为46 GB,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB)。记录总数超过80 亿条,有84 列
我也在使用snappy对 Spark 进行简单的读取/重新分区/写入,结果我得到:
~ 100 GB输出大小,具有相同的文件数、相同的编解码器、相同的数量和相同的列。
代码片段:
val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")
productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
Run Code Online (Sandbox Code Playgroud)
摄取:
creator: parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber})
extra: parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"
and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1
row group 1: RC:3640100 TS:36454739 OFFSET:4
AVAILABLE: INT64 SNAPPY …Run Code Online (Sandbox Code Playgroud) snappy apache-spark parquet apache-spark-sql spark-dataframe
我试图从mysql读取数据并将其写回s3中具有特定分区的镶木地板文件,如下所示:
df=sqlContext.read.format('jdbc')\
.options(driver='com.mysql.jdbc.Driver',url="""jdbc:mysql://<host>:3306/<>db?user=<usr>&password=<pass>""",
dbtable='tbl',
numPartitions=4 )\
.load()
df2=df.withColumn('updated_date',to_date(df.updated_at))
df2.write.parquet(path='s3n://parquet_location',mode='append',partitionBy=['updated_date'])
Run Code Online (Sandbox Code Playgroud)
我的问题是它只打开一个与mysql的连接(而不是4),并且它不会写入parquert,直到它从mysql中获取所有数据,因为我在mysql中的表很大(100M行),这个过程在OutOfMemory上失败了.
有没有办法配置Spark来打开多个与mysql的连接并将部分数据写入镶木地板?
我在这里浏览了文档:https : //spark.apache.org/docs/latest/api/python/pyspark.sql.html
它说:
和前面的问题也提到了它。但是,我仍然不明白它们究竟有何不同,以及在选择其中一个时会产生什么影响?
更重要的是,如果 repartition 进行哈希分区,提供列作为其参数有什么影响?
考虑的方法(Spark 2.2.1):
DataFrame.repartition(带partitionExprs: Column*参数的两个实现)DataFrameWriter.partitionBy从文档的partitionBy:
如果指定,输出奠定了类似文件系统
Hive的分区方案.例如,当我们Dataset按年和月分区时,目录布局如下所示:
- 年= 2016 /月= 01 /
- 年= 2016 /月= 02 /
由此,我推断列参数的顺序将决定目录布局; 因此它是相关的.
从文档的repartition:
返回
Dataset由给定分区表达式分区的新分区,使用spark.sql.shuffle.partitions分区数.结果Dataset是散列分区.
根据我目前的理解,repartition决定处理时的并行度DataFrame.有了这个定义,行为repartition(numPartitions: Int)很简单,但是对于参数的另外两个实现也是repartition如此partitionExprs: Column*.
所有事情都说,我的疑虑如下:
partitionBy方法一样,列输入的顺序也 …我有一个 DataFrame(转换为 RDD)并且想要重新分区,以便每个键(第一列)都有自己的分区。这就是我所做的:
# Repartition to # key partitions and map each row to a partition given their key rank
my_rdd = df.rdd.partitionBy(len(keys), lambda row: int(row[0]))
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试将其映射回 DataFrame 或保存它时,我收到此错误:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream
for obj in iterator:
File "spark-1.5.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1703, in add_shuffle_key
for k, v in iterator:
ValueError: too many values to unpack …Run Code Online (Sandbox Code Playgroud) 试图了解 Hive 分区与 Spark 分区的关系,最终解决了一个关于连接的问题。
我有 2 个外部 Hive 表;均由 S3 存储桶支持并由 分区date;所以在每个存储桶中都有名称为 format 的键date=<yyyy-MM-dd>/<filename>。
问题 1:
如果我将此数据读入 Spark:
val table1 = spark.table("table1").as[Table1Row]
val table2 = spark.table("table2").as[Table2Row]
Run Code Online (Sandbox Code Playgroud)
那么结果数据集将分别有多少个分区?分区等于 S3 中的对象数量?
问题2:
假设这两种行类型具有以下架构:
Table1Row(date: Date, id: String, ...)
Table2Row(date: Date, id: String, ...)
Run Code Online (Sandbox Code Playgroud)
并且我想加入table1和table2在领域date和id:
table1.joinWith(table2,
table1("date") === table2("date") &&
table1("id") === table2("id")
)
Run Code Online (Sandbox Code Playgroud)
Spark 是否能够利用被连接的字段之一是 Hive 表中的分区键来优化连接?如果是这样怎么办?
问题 3:
假设现在我正在使用RDDs 代替:
val rdd1 = table1.rdd …Run Code Online (Sandbox Code Playgroud) 假设我有一个带有列的DataFrame partition_id:
n_partitions = 2
df = spark.sparkContext.parallelize([
[1, 'A'],
[1, 'B'],
[2, 'A'],
[2, 'C']
]).toDF(('partition_id', 'val'))
Run Code Online (Sandbox Code Playgroud)
我如何重新分区DataFrame以保证每个值partition_id都转到一个单独的分区,并且实际分区的数量与不同的值完全相同partition_id?
如果我执行散列分区,即df.repartition(n_partitions, 'partition_id')保证分区数量正确,但某些分区可能为空,而其他分区可能包含多个partition_id由于散列冲突引起的值.
嗨,我有几个Spark作业,每天处理数千个文件。文件大小可能从MB到GB。完成工作后,我通常使用以下代码保存
finalJavaRDD.saveAsParquetFile("/path/in/hdfs"); OR
dataFrame.write.format("orc").save("/path/in/hdfs") //storing as ORC file as of Spark 1.4
Run Code Online (Sandbox Code Playgroud)
Spark作业会在最终输出目录中创建大量小零件文件。据我了解,Spark为每个分区/任务创建零件文件,如果我错了,请纠正我。我们如何控制Spark创建的零件文件数量?最后,我想使用这些parquet / orc目录创建Hive表,并且听说在没有大量小文件的情况下Hive运行缓慢。请指导我是Spark的新手。提前致谢。
apache-spark ×11
pyspark ×4
partitioning ×3
hive ×2
parquet ×2
dataframe ×1
hadoop ×1
mysql ×1
pyspark-sql ×1
python ×1
rdd ×1
scala ×1
snappy ×1
sql ×1