小编jk1*_*jk1的帖子

8
推荐指数
2
解决办法
2316
查看次数

为什么Spark重新分区会导致MemoryOverhead?

所以问题就在主题中。我想我没有正确理解重新分区的工作。在我看来,当我说somedataset.repartition(600)我希望所有数据都将在工作人员之间按相同大小进行划分(假设有 60 个工作人员)。

例如。我会将大量数据加载到不平衡的文件中,比如说 400 个文件,其中 20% 的大小为 2Gb,其他 80% 的大小约为 1Mb。我有加载此数据的代码:

val source = sparkSession.read.format("com.databricks.spark.csv")
  .option("header", "false")
  .option("delimiter","\t")
  .load(mypath)
Run Code Online (Sandbox Code Playgroud)

我想将原始数据转换为中间对象,过滤不相关的记录,转换为最终对象(具有附加属性),然后按某些列分区并写入镶木地板。在我看来,在工作人员之间平衡数据(40000 个分区)似乎是合理的,而不是像这样进行工作:

val ds: Dataset[FinalObject] = source.repartition(600)
  .map(parse)
  .filter(filter.IsValid(_))
  .map(convert)
  .persist(StorageLevel.DISK_ONLY)
val count = ds.count
log(count)
val partitionColumns = List("region", "year", "month", "day")

ds.repartition(partitionColumns.map(new org.apache.spark.sql.Column(_)):_*)
  .write.partitionBy(partitionColumns:_*)
  .format("parquet")
  .mode(SaveMode.Append)
  .save(destUrl)
Run Code Online (Sandbox Code Playgroud)

但它失败了

ExecutorLostFailure(执行程序 7 因正在运行的任务之一而退出) 原因:容器因超出内存限制而被 YARN 终止。已使用 34.6 GB 物理内存或 34.3 GB 物理内存。考虑提高spark.yarn.executor.memoryOverhead。

当我不进行重新分区时,一切都很好。我哪里不明白重新分区正确吗?

scala partitioning apache-spark

7
推荐指数
1
解决办法
4685
查看次数

如何在 Athena 中查询 NaN 双精度值

我需要查询类似 AWS Athena 中的内容

SELECT * FROM "hl"."may" where fqk = 'NaN' limit 10
Run Code Online (Sandbox Code Playgroud)

sql presto parquet amazon-athena

5
推荐指数
1
解决办法
5952
查看次数

肥皂响应编码“?” 所有字符串中的字符而不是俄罗斯.Net代理,Java服务器(?)

我使用 wsdl.exe 生成代理类来请求 Web 服务,这些服务可能是在 java 平台上构建的。问题在于响应的编码。我得到 \'?\' 而不是俄语字母。(例如 \'????26\' 而不是 \'\xd0\x90\xd0\x9d26\' ) \n我也使用soapUI,一切正常。我在配置 .Net 客户端方面没有经验。那么我如何确定和配置正确的响应编码。我已经使用了 app.config,如下所示:\n

\n\n

我在这里附加标题信息。我不知道响应标头中的编码信息...\n请求标头:

\n\n
Accept-Encoding: gzip,deflate\nContent-Type: text/xml;charset=UTF-8\nSOAPAction: "urn:#DCSSci_ListFlight_5"\nContent-Length: 641\nHost: 109.73.1.66:23022\nConnection: Keep-Alive\nUser-Agent: Apache-HttpClient/4.1.1 (java 1.5)\n
Run Code Online (Sandbox Code Playgroud)\n\n

响应标头:

\n\n
HTTP/1.1 200 OK\nDate: Thu, 06 Sep 2012 03:47:52 GMT\nServer: Apache/2.2.10 (Linux/SUSE)\n200 OKX-FidelXML-Version: 2.0\nContent-length: 15464\nKeep-Alive: timeout=15, max=100\nConnection: Keep-Alive\nContent-Type: text/xml\n
Run Code Online (Sandbox Code Playgroud)\n\n

解决方案:

\n\n
public class TraceExtension : SoapExtension\n{\n    Stream oldStream;\n    Stream newStream;\n    public override Stream ChainStream(Stream stream)\n    {\n        oldStream = stream;\n        newStream = new MemoryStream();\n        return …
Run Code Online (Sandbox Code Playgroud)

.net java encoding soap config

2
推荐指数
1
解决办法
8301
查看次数

Why SPARK repeat transformations after persist operations?

I have next code. I am doing count to perform persist operation and fix transformations above. But I noticed that DAG and stages for 2 different count Jobs calls first persist twice (when I expect second persist method to be called in second count call)

val df = sparkSession.read
      .parquet(bigData)
      .filter(row => dateRange(row.getLong(5), lowerTimeBound, upperTimeBound))
      .as[SafegraphRawData]
      // So repartition here to be able perform shuffle operations later
      // another transformations and minor filtration
      .repartition(nrInputPartitions)
      // Firstly persist here since objects …
Run Code Online (Sandbox Code Playgroud)

scala persist directed-acyclic-graphs apache-spark

1
推荐指数
1
解决办法
1178
查看次数