我正在尝试将 Spark 用于一个非常简单的用例:给定大量文件(90k),其中包含数百万台设备的设备时间序列数据,将给定设备的所有时间序列读取分组到一组文件中(分割)。现在让\xe2\x80\x99s 假设我们的目标是 100 个分区,并且给定设备数据显示在同一个输出文件(只是同一个分区)中并不重要。
\n考虑到这个问题,我们\xe2\x80\x99想出了两种方法来做到这一点 - repartitionthenwrite或writewithpartitionBy应用于Writer. 其中任何一个的代码都非常简单:
repartition(添加哈希列是为了确保与partitionBy下面的代码的比较是一对一的):
\ndf = spark.read.format("xml") \\\n .options(rowTag="DeviceData") \\\n .load(file_path, schema=meter_data) \\\n .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \\\n .repartition("partition") \\\n .write.format("json") \\\n .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \\\n .mode("overwrite") \\\n .save(output_path)\n\nRun Code Online (Sandbox Code Playgroud)\npartitionBy:
\ndf = spark.read.format("xml") \\\n .options(rowTag="DeviceData") \\\n .load(file_path, schema=meter_data) \\\n .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \\\n .write.format("json") \\\n .partitionBy(\xe2\x80\x9cpartition\xe2\x80\x9d) \\\n .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \\\n .mode("overwrite") \\\n .save(output_path)\n\nRun Code Online (Sandbox Code Playgroud)\n在我们的测试中, …
我想使用spark将大型(51GB)XML文件(在外部硬盘上)读入数据帧(使用spark-xml插件),进行简单的映射/过滤,重新排序,然后将其写回磁盘,作为CSV文件.
但java.lang.OutOfMemoryError: Java heap space无论我如何调整它,我总是得到一个.
我想了解为什么不增加分区数量来阻止OOM错误
它不应该将任务分成更多部分,以便每个部分都更小并且不会导致内存问题吗?
(Spark可能不会尝试将所有东西都填入内存并且如果它不适合就会崩溃,对吧?)
我试过的事情:
spark.memory.fraction0.8(默认值为0.6)spark.memory.storageFraction到0.2(默认为0.5)spark.default.parallelism为30和40(对我来说默认为8)spark.files.maxPartitionBytes为64M(默认为128M)我的所有代码都在这里(注意我没有缓存任何东西):
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema) // defined previously
.option("rowTag", "row")
.load(s"$pathToInputXML")
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604
// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)
// filter and select only the cols i'm interested in
val dsout = df2
.where( …Run Code Online (Sandbox Code Playgroud) 如何解析包含其中一列中的 xml 数据的 xml 文件?
在我们的一个项目中,我们收到 xml 文件,其中一些列存储另一个 xml。在将此数据加载到数据帧时,内部 xml 被转换为StringType(这不是故意的),因此在查询数据时无法访问节点(使用点运算符)。
我已经在网上生动地四处寻找答案,但没有运气。在 GitHub 中发现了一个与我的用例完全相同的未解决问题。链接在这里。
https://github.com/databricks/spark-xml/issues/140
我的 xml 源文件如下所示。
+------+--------------------+
| id | xml |
+------+--------------------+
| 6723 |<?xml version="1....|
| 6741 |<?xml version="1....|
| 6774 |<?xml version="1....|
| 6735 |<?xml version="1....|
| 6828 |<?xml version="1....|
| 6764 |<?xml version="1....|
| 6732 |<?xml version="1....|
| 6792 |<?xml version="1....|
| 6754 |<?xml version="1....|
| 6833 |<?xml version="1....|
+------+--------------------+
Run Code Online (Sandbox Code Playgroud)
在 SQL Server 中,为了将 xml 存储在数据库列中,有XML数据类型,但 Spark SQL 中不存在相同的数据类型。
有没有人遇到同样的问题并找到任何解决方法?如果是,请分享。我们正在使用 …
我正在尝试从JavaRDd <Book>和JavaRdd <Reviews>生成一个复杂的xml,我如何将这两个结合在一起以在xml之下生成?
<xml>
<library>
<books>
<book>
<author>test</author>
</book>
</books>
<reviews>
<review>
<id>1</id>
</review>
</reviews>
</library>
Run Code Online (Sandbox Code Playgroud)
如您所见,有一个父根库,其中包含子书和评论。
以下是我如何生成Book and Review Dataframe
DataFrame bookFrame = sqlCon.createDataFrame(bookRDD, Book.class);
DataFrame reviewFrame = sqlCon.createDataFrame(reviewRDD, Review.class);
Run Code Online (Sandbox Code Playgroud)
我知道要生成xml,而我的疑问尤其是对于拥有Library rootTag以及将Books and Reviews作为其子元素。
我正在使用Java。但是如果您可以指出正确的内容,则可以编写Scala或Python示例。
apache-spark apache-spark-sql spark-dataframe apache-spark-dataset apache-spark-xml
我正在尝试使用 spark-xml jar 在 pyspark 中读取 xml/嵌套 xml。
df = sqlContext.read \
.format("com.databricks.spark.xml")\
.option("rowTag", "hierachy")\
.load("test.xml"
Run Code Online (Sandbox Code Playgroud)
当我执行时,数据框没有正确创建。
+--------------------+
| att|
+--------------------+
|[[1,Data,[Wrapped...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)
下面提到了我的 xml 格式:
我使用Databricks spark-xml包将一个XML文件读入Spark.该文件具有以下数据结构:
<lib>
<element>
<genre>Thriller</genre>
<dates>
<date>2000-10-01</date>
<date>2020-10-01</date>
</dates>
</element>
<element>
<genre>SciFi</genre>
<dates>
<date>2015-10-01</date>
</dates>
</element>
</lib>
Run Code Online (Sandbox Code Playgroud)
加载数据后,我得到一个看起来像这样的数据框:
root
|-- genre: string (nullable = true)
|-- publish_dates: struct (nullable = true)
| |-- publish_date: array (nullable = true)
| | |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)
并且"show"产生以下结果:
gerne | dates
Th... | [WrappedArray(20...
Run Code Online (Sandbox Code Playgroud)
是否有可能以下列形式获得此结果:
gerne | date
Th... | 2000-...
Th... | 2020-...
Sci.. | 2015-...
Run Code Online (Sandbox Code Playgroud)
我已经尝试过:
val rdd = df.select("genre", "dates").rdd.map(row => (row(0), row(1))).flatMapValues(_.toString)
Run Code Online (Sandbox Code Playgroud)
和
val rdd = df.select("genre", …Run Code Online (Sandbox Code Playgroud)