标签: apache-spark-xml

为什么Spark中repartition比partitionBy更快?

我正在尝试将 Spark 用于一个非常简单的用例:给定大量文件(90k),其中包含数百万台设备的设备时间序列数据,将给定设备的所有时间序列读取分组到一组文件中(分割)。现在让\xe2\x80\x99s 假设我们的目标是 100 个分区,并且给定设备数据显示在同一个输出文件(只是同一个分区)中并不重要。

\n

考虑到这个问题,我们\xe2\x80\x99想出了两种方法来做到这一点 - repartitionthenwritewritewithpartitionBy应用于Writer. 其中任何一个的代码都非常简单:

\n

repartition(添加哈希列是为了确保与partitionBy下面的代码的比较是一对一的):

\n
\ndf = spark.read.format("xml") \\\n  .options(rowTag="DeviceData") \\\n  .load(file_path, schema=meter_data) \\\n  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \\\n  .repartition("partition") \\\n  .write.format("json") \\\n  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \\\n  .mode("overwrite") \\\n  .save(output_path)\n\n
Run Code Online (Sandbox Code Playgroud)\n

partitionBy:

\n
\ndf = spark.read.format("xml") \\\n  .options(rowTag="DeviceData") \\\n  .load(file_path, schema=meter_data) \\\n  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \\\n  .write.format("json") \\\n  .partitionBy(\xe2\x80\x9cpartition\xe2\x80\x9d) \\\n  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \\\n  .mode("overwrite") \\\n  .save(output_path)\n\n
Run Code Online (Sandbox Code Playgroud)\n

在我们的测试中, …

apache-spark apache-spark-sql pyspark apache-spark-xml

9
推荐指数
2
解决办法
7104
查看次数

在Spark 2.1.0中读取大文件时出现内存不足错误

我想使用spark将大型(51GB)XML文件(在外部硬盘上)读入数据帧(使用spark-xml插件),进行简单的映射/过滤,重新排序,然后将其写回磁盘,作为CSV文件.

java.lang.OutOfMemoryError: Java heap space无论我如何调整它,我总是得到一个.

我想了解为什么不增加分区数量来阻止OOM错误

它不应该将任务分成更多部分,以便每个部分都更小并且不会导致内存问题吗?

(Spark可能不会尝试将所有东西都填入内存并且如果它不适合就会崩溃,对吧?)

我试过的事情:

  • 在读取和写入时对数据帧进行重新分区/合并(5,000和10,000分区)(初始值为1,604)
  • 使用较少数量的执行程序(6,4 ,即使有2个执行程序,我也会收到OOM错误!)
  • 减少拆分文件的大小(默认看起来像是33MB)
  • 给大量的RAM(我所拥有的)
  • 增加到spark.memory.fraction0.8(默认值为0.6)
  • 减少spark.memory.storageFraction到0.2(默认为0.5)
  • 设置spark.default.parallelism为30和40(对我来说默认为8)
  • 设置spark.files.maxPartitionBytes为64M(默认为128M)

我的所有代码都在这里(注意我没有缓存任何东西):

val df: DataFrame = spark.sqlContext.read
  .option("mode", "DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema) // defined previously
  .option("rowTag", "row")
  .load(s"$pathToInputXML")

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604

// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)

// filter and select only the cols i'm interested in
val dsout = df2
  .where( …
Run Code Online (Sandbox Code Playgroud)

xml scala apache-spark apache-spark-2.0 apache-spark-xml

5
推荐指数
1
解决办法
2169
查看次数

如何解析包含 xml 字符串的数据框?

如何解析包含其中一列中的 xml 数据的 xml 文件?

在我们的一个项目中,我们收到 xml 文件,其中一些列存储另一个 xml。在将此数据加载到数据帧时,内部 xml 被转换为StringType(这不是故意的),因此在查询数据时无法访问节点(使用点运算符)。

我已经在网上生动地四处寻找答案,但没有运气。在 GitHub 中发现了一个与我的用例完全相同的未解决问题。链接在这里。

https://github.com/databricks/spark-xml/issues/140

我的 xml 源文件如下所示。

+------+--------------------+
|  id  |               xml  |
+------+--------------------+
| 6723 |<?xml version="1....|
| 6741 |<?xml version="1....|
| 6774 |<?xml version="1....|
| 6735 |<?xml version="1....|
| 6828 |<?xml version="1....|
| 6764 |<?xml version="1....|
| 6732 |<?xml version="1....|
| 6792 |<?xml version="1....|
| 6754 |<?xml version="1....|
| 6833 |<?xml version="1....|
+------+--------------------+
Run Code Online (Sandbox Code Playgroud)

在 SQL Server 中,为了将 xml 存储在数据库列中,有XML数据类型,但 Spark SQL 中不存在相同的数据类型。

有没有人遇到同样的问题并找到任何解决方法?如果是,请分享。我们正在使用 …

apache-spark-xml

5
推荐指数
1
解决办法
2520
查看次数

如何使用Spark-Xml生成复杂的XML

我正在尝试从JavaRDd <Book>和JavaRdd <Reviews>生成一个复杂的xml,我如何将这两个结合在一起以在xml之下生成?

<xml>
<library>
    <books>
        <book>
            <author>test</author>
        </book>
    </books>
    <reviews>
        <review>
            <id>1</id>
        </review>
    </reviews>
</library>
Run Code Online (Sandbox Code Playgroud)

如您所见,有一个父根库,其中包含子书和评论。

以下是我如何生成Book and Review Dataframe

DataFrame bookFrame = sqlCon.createDataFrame(bookRDD, Book.class);
DataFrame reviewFrame = sqlCon.createDataFrame(reviewRDD, Review.class);
Run Code Online (Sandbox Code Playgroud)

我知道要生成xml,而我的疑问尤其是对于拥有Library rootTag以及将Books and Reviews作为其子元素。

我正在使用Java。但是如果您可以指出正确的内容,则可以编写Scala或Python示例。

apache-spark apache-spark-sql spark-dataframe apache-spark-dataset apache-spark-xml

5
推荐指数
1
解决办法
1252
查看次数

在 spark 中读取 XML

我正在尝试使用 spark-xml jar 在 pyspark 中读取 xml/嵌套 xml。

df = sqlContext.read \
  .format("com.databricks.spark.xml")\
   .option("rowTag", "hierachy")\
   .load("test.xml"
Run Code Online (Sandbox Code Playgroud)

当我执行时,数据框没有正确创建。

    +--------------------+
    |                 att|
    +--------------------+
    |[[1,Data,[Wrapped...|
    +--------------------+
Run Code Online (Sandbox Code Playgroud)

下面提到了我的 xml 格式:

在此处输入图片说明

xml dataframe apache-spark pyspark apache-spark-xml

3
推荐指数
1
解决办法
3万
查看次数

如何在Scala/Spark中扩展Dataframe中的数组

我使用Databricks spark-xml包将一个XML文件读入Spark.该文件具有以下数据结构:

<lib>
 <element>
  <genre>Thriller</genre>
  <dates>
   <date>2000-10-01</date>
   <date>2020-10-01</date>
  </dates>
 </element>
 <element>
  <genre>SciFi</genre>
  <dates>
   <date>2015-10-01</date>
  </dates>
 </element>
</lib>    
Run Code Online (Sandbox Code Playgroud)

加载数据后,我得到一个看起来像这样的数据框:

root
 |-- genre: string (nullable = true)
 |-- publish_dates: struct (nullable = true)
 |    |-- publish_date: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)

并且"show"产生以下结果:

gerne | dates
Th... | [WrappedArray(20...
Run Code Online (Sandbox Code Playgroud)

是否有可能以下列形式获得此结果:

gerne | date
Th... | 2000-...
Th... | 2020-...
Sci.. | 2015-...
Run Code Online (Sandbox Code Playgroud)

我已经尝试过:

val rdd = df.select("genre", "dates").rdd.map(row => (row(0), row(1))).flatMapValues(_.toString)
Run Code Online (Sandbox Code Playgroud)

val rdd = df.select("genre", …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-xml

0
推荐指数
1
解决办法
3012
查看次数