我有一个火花python应用程序被纱线杀死超过内存限制.我有一个步骤涉及加载一些有点重(500+ MB)的资源,所以我正在使用mapPartitions.基本上:
def process_and_output(partition):
resources = load_resources()
for record in partition:
yield transform_record(resources, record)
input = sc.textFile(input_location)
processed = input.mapPartitions(process_and_output)
processed.saveAsTextFile(output_location)
Run Code Online (Sandbox Code Playgroud)
运行时,我一直收到此错误:
错误YarnScheduler:丢失执行程序1(已删除地址):由于超出内存限制而被YARN杀死的容器.使用11.4 GB的11.2 GB物理内存.考虑提升spark.yarn.executor.memoryOverhead.
我试过将memoryOverhead提升到极高,但仍然是同样的问题.我跑了:
--conf "spark.python.worker.memory=1200m" \
--conf "spark.yarn.executor.memoryOverhead=5300" \
--conf "spark.executor.memory=6g" \
Run Code Online (Sandbox Code Playgroud)
当然,这足够的记忆总结?
我想更普遍的是,我很难理解蟒蛇工作者的记忆是如何在总体中控制/计算的.有没有这方面的文件?
我还想了解使用生成器函数是否会减少内存使用量.它会通过python进程流式传输数据(就像我希望的那样)还是会在发送回JVM/spark基础设施之前将其缓冲?
我试图抽象插入不同类型的对象到类似结构的SQL表.这是我正在尝试做的事情:
class TableAccess[A : Meta](table: String) {
def insert(key: String, a: A): ConnectionIO[Unit] = {
(fr"insert into " ++ Fragment.const(table) ++ fr" values ($key, $a);").update.run.map(_ => ())
}
}
Run Code Online (Sandbox Code Playgroud)
但我得到这个编译错误:
[error] diverging implicit expansion for type doobie.util.param.Param[A]
[error] starting with method fromMeta in object Param
[error] (fr"insert into " ++ Fragment.const(table) ++ fr" values ($key, $a);").update.run.map(_ => ())
Run Code Online (Sandbox Code Playgroud)
我在文档中找到的只有:
doobie允许您使用Meta实例插入任何类型的值(及其选项),其中包括...
但在这种情况下似乎还不够; 我需要什么样的类型类/进口/转换?
我希望能够创建一个读取序列文件的自定义InputFormat,但另外公开记录所在文件中的文件路径和偏移量.
退一步,这是用例:我有一个包含可变大小数据的序列文件.密钥几乎不相关,并且值高达几兆字节,包含各种不同的字段.我想将elasticsearch中的一些字段与文件名和偏移量一起索引.这样,我可以从elasticsearch中查询这些字段,然后使用文件名和偏移量返回到序列文件并获取原始记录,而不是将整个内容存储在ES中.
我将整个过程作为单个java程序运行.该SequenceFile.Reader类提供了方便getPosition和seek方法来实现这一目标.
但是,最终会涉及到数TB的数据,所以我需要将其转换为MapReduce作业(可能只有Map).由于序列文件中的实际键是无关紧要的,我希望采用的方法是创建一个自定义的InputFormat,它扩展或以某种方式利用SquenceFileInputFormat,而不是返回实际的键,而是返回一个由文件组成的复合键和抵消.
然而,事实证明这在实践中更加困难.看起来它应该是可能的,但考虑到实际的API以及暴露的内容,它很棘手.有任何想法吗?也许我应该采取另一种方法?
有没有办法运行DistCp,但有一个选项可以重命名文件名冲突?也许最简单的解释一个例子.
假设我正在复制到hdfs:/// foo到hdfs:/// bar,而foo包含这些文件:
hdfs:///foo/a
hdfs:///foo/b
hdfs:///foo/c
Run Code Online (Sandbox Code Playgroud)
和栏包含以下内容:
hdfs:///bar/a
hdfs:///bar/b
Run Code Online (Sandbox Code Playgroud)
然后复制之后,我想要条形图包含:
hdfs:///bar/a
hdfs:///bar/a-copy1
hdfs:///bar/b
hdfs:///bar/b-copy1
hdfs:///bar/c
Run Code Online (Sandbox Code Playgroud)
如果没有这样的选择,那么最可靠/最有效的方法是什么?我自己的本土版本的distcp当然可以完成它,但这似乎可能是很多工作,而且很容易出错.基本上,我根本不关心文件名,只关心它们的目录,我想定期将大量数据复制到"合并"目录中.
我有一个图表,其中每个顶点都有一个ID(永远不会改变)和一个标签(经常变化).两者都以长子为代表.
目前,我定义了以下类型:
type VertexId = Long
type VertexLabel = Long
Run Code Online (Sandbox Code Playgroud)
但是,我今天发现了一个错误,我将VertexLabel传递给了一个期望VertexId的函数.这似乎是scala编译器应该能够阻止的类型.
我考虑过这样做:
case class VertexId(id: Long)
case class VertexLabel(label: Long)
Run Code Online (Sandbox Code Playgroud)
但随后还有"拳击"和"拆箱"的额外开销以及一些额外的内存使用量.
反正是为了获得两全其美?将这两种类型定义为Longs,以便编译器阻止您使用另一种类型?
我最近将我的应用程序转换为继承猫的应用程序,如此处IOApp所述。我在该文档中读到:
\n\n\nTimer[IO] 依赖项已由 IOApp 提供,因此在 JVM 之上,\xe2\x80\x99 不再需要隐式 ExecutionContext 位于范围内
\n
但是,我正在与其他几个确实需要ExecutionContext. 是否有推荐的方式来获取此类应用程序?老好人import scala.concurrent.ExecutionContext.Implicits.global对所提供的东西玩得好吗Timer[IO]?
hadoop ×3
scala ×3
mapreduce ×2
apache-spark ×1
cats-effect ×1
distcp ×1
doobie ×1
hadoop-yarn ×1
java ×1
pyspark ×1
python ×1
scala-cats ×1
sequencefile ×1
sql ×1
type-safety ×1
types ×1