小编Joe*_*e K的帖子

Python Spark/Yarn内存使用情况

我有一个火花python应用程序被纱线杀死超过内存限制.我有一个步骤涉及加载一些有点重(500+ MB)的资源,所以我正在使用mapPartitions.基本上:

def process_and_output(partition):
    resources = load_resources()
    for record in partition:
        yield transform_record(resources, record)

input = sc.textFile(input_location)
processed = input.mapPartitions(process_and_output)
processed.saveAsTextFile(output_location)
Run Code Online (Sandbox Code Playgroud)

运行时,我一直收到此错误:

错误YarnScheduler:丢失执行程序1(已删除地址):由于超出内存限制而被YARN杀死的容器.使用11.4 GB的11.2 GB物理内存.考虑提升spark.yarn.executor.memoryOverhead.

我试过将memoryOverhead提升到极高,但仍然是同样的问题.我跑了:

--conf "spark.python.worker.memory=1200m" \
--conf "spark.yarn.executor.memoryOverhead=5300" \
--conf "spark.executor.memory=6g" \
Run Code Online (Sandbox Code Playgroud)

当然,这足够的记忆总结?

我想更普遍的是,我很难理解蟒蛇工作者的记忆是如何在总体中控制/计算的.有没有这方面的文件?

我还想了解使用生成器函数是否会减少内存使用量.它会通过python进程流式传输数据(就像我希望的那样)还是会在发送回JVM/spark基础设施之前将其缓冲?

python hadoop hadoop-yarn apache-spark pyspark

10
推荐指数
2
解决办法
1335
查看次数

Scala doobie片段与泛型类型参数

我试图抽象插入不同类型的对象到类似结构的SQL表.这是我正在尝试做的事情:

class TableAccess[A : Meta](table: String) {
  def insert(key: String, a: A): ConnectionIO[Unit] = {
    (fr"insert into " ++ Fragment.const(table) ++ fr" values ($key, $a);").update.run.map(_ => ())
  }
}
Run Code Online (Sandbox Code Playgroud)

但我得到这个编译错误:

[error] diverging implicit expansion for type doobie.util.param.Param[A]
[error] starting with method fromMeta in object Param
[error]     (fr"insert into " ++ Fragment.const(table) ++ fr" values ($key, $a);").update.run.map(_ => ())
Run Code Online (Sandbox Code Playgroud)

我在文档中找到的只有:

doobie允许您使用Meta实例插入任何类型的值(及其选项),其中包括...

但在这种情况下似乎还不够; 我需要什么样的类型类/进口/转换?

sql scala doobie

10
推荐指数
1
解决办法
505
查看次数

扩展SequenceFileInputFormat以包含文件名+偏移量

我希望能够创建一个读取序列文件的自定义InputFormat,但另外公开记录所在文件中的文件路径和偏移量.

退一步,这是用例:我有一个包含可变大小数据的序列文件.密钥几乎不相关,并且值高达几兆字节,包含各种不同的字段.我想将elasticsearch中的一些字段与文件名和偏移量一起索引.这样,我可以从elasticsearch中查询这些字段,然后使用文件名和偏移量返回到序列文件并获取原始记录,而不是将整个内容存储在ES中.

我将整个过程作为单个java程序运行.该SequenceFile.Reader类提供了方便getPositionseek方法来实现这一目标.

但是,最终会涉及到数TB的数据,所以我需要将其转换为MapReduce作业(可能只有Map).由于序列文件中的实际键是无关紧要的,我希望采用的方法是创建一个自定义的InputFormat,它扩展或以某种方式利用SquenceFileInputFormat,而不是返回实际的键,而是返回一个由文件组成的复合键和抵消.

然而,事实证明这在实践中更加困难.看起来它应该是可能的,但考虑到实际的API以及暴露的内容,它很棘手.有任何想法吗?也许我应该采取另一种方法?

java hadoop mapreduce sequencefile

7
推荐指数
1
解决办法
1227
查看次数

Hadoop DistCp通过重命名处理相同的文件名

有没有办法运行DistCp,但有一个选项可以重命名文件名冲突?也许最简单的解释一个例子.

假设我正在复制到hdfs:/// foo到hdfs:/// bar,而foo包含这些文件:

hdfs:///foo/a
hdfs:///foo/b
hdfs:///foo/c
Run Code Online (Sandbox Code Playgroud)

和栏包含以下内容:

hdfs:///bar/a
hdfs:///bar/b
Run Code Online (Sandbox Code Playgroud)

然后复制之后,我想要条形图包含:

hdfs:///bar/a
hdfs:///bar/a-copy1
hdfs:///bar/b
hdfs:///bar/b-copy1
hdfs:///bar/c
Run Code Online (Sandbox Code Playgroud)

如果没有这样的选择,那么最可靠/最有效的方法是什么?我自己的本土版本的distcp当然可以完成它,但这似乎可能是很多工作,而且很容易出错.基本上,我根本不关心文件名,只关心它们的目录,我想定期将大量数据复制到"合并"目录中.

hadoop mapreduce distcp

5
推荐指数
1
解决办法
726
查看次数

Scala类型 - 安全性与开销(类型与类)

我有一个图表,其中每个顶点都有一个ID(永远不会改变)和一个标签(经常变化).两者都以长子为代表.

目前,我定义了以下类型:

type VertexId = Long
type VertexLabel = Long
Run Code Online (Sandbox Code Playgroud)

但是,我今天发现了一个错误,我将VertexLabel传递给了一个期望VertexId的函数.这似乎是scala编译器应该能够阻止的类型.

我考虑过这样做:

case class VertexId(id: Long)
case class VertexLabel(label: Long)
Run Code Online (Sandbox Code Playgroud)

但随后还有"拳击"和"拆箱"的额外开销以及一些额外的内存使用量.

反正是为了获得两全其美?将这两种类型定义为Longs,以便编译器阻止您使用另一种类型?

types scala type-safety

3
推荐指数
1
解决办法
105
查看次数

scala cats IOApp 应如何获取 ExecutionContext?

我最近将我的应用程序转换为继承猫的应用程序,如此IOApp所述。我在该文档中读到:

\n\n
\n

Timer[IO] 依赖项已由 IOApp 提供,因此在 JVM 之上,\xe2\x80\x99 不再需要隐式 ExecutionContext 位于范围内

\n
\n\n

但是,我正在与其他几个确实需要ExecutionContext. 是否有推荐的方式来获取此类应用程序?老好人import scala.concurrent.ExecutionContext.Implicits.global对所提供的东西玩得好吗Timer[IO]

\n

scala scala-cats cats-effect

2
推荐指数
1
解决办法
2115
查看次数