小编Jon*_*ton的帖子

用于映射大数据的Python共享内存字典

我一直很难使用大型字典(~86GB,17.5亿个密钥)来处理使用Python中的多处理的大数据集(2TB).

上下文:将字符串映射到字符串的字典从pickle文件加载到内存中.加载后,将创建工作进程(理想情况下> 32),该进程必须在字典中查找值,但不能修改其内容,以便处理~2TB数据集.数据集需要并行处理,否则任务将花费一个月的时间.

这里有2 3 4 5 6 7 8 9办法(全部失败),我曾尝试:

  1. 将字典存储为Python程序中的全局变量,然后派生~32个工作进程.理论上这个方法可能有效,因为字典没有被修改,因此forkLinux上的COW机制意味着数据结构将被共享而不是在进程之间复制.然而,当我尝试这一点,我的程序崩溃的os.fork()内部multiprocessing.Pool.mapOSError: [Errno 12] Cannot allocate memory.我确信这是因为内核配置为永远不会过度使用内存(/proc/sys/vm/overcommit_memory设置为2,我无法在机器上配置此设置,因为我没有root访问权限).

  2. 将字典加载到共享内存字典中multiprocessing.Manager.dict.通过这种方法,我能够在不崩溃的情况下分叉32个工作进程,但后续数据处理比不需要字典的任务的另一个版本慢了几个数量级(唯一的区别是没有字典查找).我认为这是因为包含字典的管理器进程与每个工作进程之间的进程间通信,这是每次单个字典查找所必需的.虽然字典没有被修改,但它被访问了很多次,通常是由许多进程同时访问.

  3. 将字典复制到C++中std::map并依赖Linux的COW机制来防止它被复制(如方法#1,除了C++中的字典).通过这种方法,花了很长的时间来加载到字典std::map,并随后从坠毁ENOMEMos.fork()像以前一样.

  4. 将字典复制到pyshmht.复制字典需要太长时间pyshmht.

  5. 尝试使用SNAP的HashTable.C++中的底层实现允许在共享内存中制作和使用它.不幸的是,Python API不提供此功能.

  6. 使用PyPy.崩溃仍然发生在#1中.

  7. 在python中实现我自己的共享内存哈希表multiprocessing.Array.这种方法仍导致#1中出现内存不足错误.

  8. 将字典转储到dbm.在尝试将字典转储到dbm数据库中四天并看到"33天"的ETA之后,我放弃了这种方法.

  9. 将字典转储到Redis中.当我尝试将字典(86GB dict从1024个较小的dicts中加载)转储到Redis时,redis.mset我通过对等错误重置连接.当我尝试使用循环转储键值对时,需要很长时间.

如何在不需要进程间通信的情况下有效地并行处理此数据集,以便在此字典中查找值.我欢迎任何解决这个问题的建议!

我在Ubuntu上使用Anaconda的Python 3.6.3在具有1TB RAM的机器上.


编辑:最终有效:

我能够使用Redis来实现这一点.为了解决#9中发布的问题,我不得不将大的键值插入和查询查询分成"一口大小"的块,这样它仍然可以批量处理,但是没有超出查询的超时时间.这样做允许在45分钟内插入86GB字典(128个线程和一些负载平衡),并且后续处理不会受到Redis查询查询(在2天内完成)的性能影响.

谢谢大家的帮助和建议.

python dictionary bigdata python-multiprocessing

10
推荐指数
4
解决办法
2262
查看次数

具有多个输入的 Tensorflow 2.0 自定义损失函数

我正在尝试使用以下两个损失函数优化模型

def loss_1(pred, weights, logits):
    weighted_sparse_ce = kls.SparseCategoricalCrossentropy(from_logits=True)
    policy_loss = weighted_sparse_ce(pred, logits, sample_weight=advantages)
Run Code Online (Sandbox Code Playgroud)

def loss_2(y_pred, y):
    return kls.mean_squared_error(y_pred, y)
Run Code Online (Sandbox Code Playgroud)

然而,因为 TensorFlow 2 期望损失函数的形式为

def fn(y_pred, y_true):
    ...
Run Code Online (Sandbox Code Playgroud)

我正在使用一种变通方法来解决loss_1我打包predweights放入单个张量的地方,然后再将其传递到loss_1调用中model.fit,然后将它们解压到loss_1. 这是不雅和讨厌的,因为predweights具有不同的数据类型,因此每次调用model.fit.

此外,我知道 的sample_weight论点fit,这有点像这个问题的解决方案。如果不是因为我使用了两个损失函数并且我只想将其sample_weight应用于其中一个,这可能是一个可行的解决方案。此外,即使这是一个解决方案,它也不能推广到其他类型的自定义损失函数。


话虽如此,我的问题简明地说是:

在 TensorFlow 2 中创建具有任意数量参数的损失函数的最佳方法是什么?

我尝试过的另一件事是传递 atf.tuple但这似乎也违反了 TensorFlow 对损失函数输入的期望。

keras tensorflow tensorflow2.0

9
推荐指数
2
解决办法
8641
查看次数

为什么该功能不能在Haskell中终止?

我很困惑,为什么我的功能nest,其组成f与自己n时代

nest f 0 = id
nest f n = f . nest f (n - 1)
Run Code Online (Sandbox Code Playgroud)

永不终止。我本以为在n为零的情况下会“匹配模式” 。我通过在GHCI中键入这两行并使用nest (+ 1) 2 3进行调用来定义它。

haskell

8
推荐指数
2
解决办法
110
查看次数

在foreach中使用ref的替代方法?

我有一个带有签名的修改方法

private bool Modify(ref MyClass obj);
Run Code Online (Sandbox Code Playgroud)

这将修改obj并表明其返回值的成功.Modify是不是重新分配引用(我知道这不起作用),只是修改实例字段,所以我想用它来做类似以下的事情:

foreach(MyClass obj in myList)
{
    bool success = Modify(obj);
    // do things depending on success
}
Run Code Online (Sandbox Code Playgroud)

我正在遇到一个问题,因为obj"没有使用ref关键字传递".但是,如果我像这样放入ref关键字:

bool success = Modify(ref obj);
Run Code Online (Sandbox Code Playgroud)

我得到"不能obj用作ref/ out因为它是'foreach迭代变量'.我知道foreach使用了一个不可变的迭代器,这就是为什么这不起作用.

我的问题是,做这样的工作最容易的替代方案是什么?

我试过用

foreach(int i = 0; i < myList.Count; i++)
{
    bool success = Modify(ref myList[i]);
    // do things depending on success
}
Run Code Online (Sandbox Code Playgroud)

但他们得到"属性或索引器可能不会作为ref参数传递".

谢谢你的帮助.

c# foreach ref

7
推荐指数
2
解决办法
8229
查看次数

如何使用布尔模板参数启用成员函数?

我希望一个类具有 , 的两种不同实现push,并根据布尔模板参数进行选择。我尝试使用本答案中描述的 SFINAE 原则,如下所示:

template<class T, bool foo=true>
class Bar {
  template <>
  typename std::enable_if<foo>::type
  push(const T& value) { /* one implementation */}

  template <>
  typename std::enable_if<!foo>::type
  push(const T& value) { /* another implementation */ } 
}
Run Code Online (Sandbox Code Playgroud)

但是,我push在 gcc 下收到“无法专门化类范围内的函数”的错误,我不明白为什么。尽管我的代码与链接答案中的代码不完全一样,但它似乎非常相似,我无法发现关键区别。

我还尝试使用与此答案中建议的语法类似的语法,但它也不起作用(错误是“不能重新声明类成员”):

  template <bool enable=foo>
  typename std::enable_if<enable>::type
  push(const T& value) { /* one implementation */}

  template <bool enable=!foo>
  typename std::enable_if<enable>::type
  push(const T& value) { /* another implementation */ } 
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?

c++ templates sfinae

4
推荐指数
1
解决办法
1324
查看次数

如何删除指向动态分配对象的智能指针?

我做了一个迭代器,当取消引用时,它返回std::shared_ptr指向动态分配的副本std::pair(在new迭代器的内部创建)。迭代器按原样运行,但希望pair在循环结束时释放,以防止内存泄漏。

for (auto it = parser.begin(); it != parser.end(); ++it) {
    shared_ptr<pair<string, string>> record = *it;
    // Analysis of pair
    // delete pair with delete, or reset
}
Run Code Online (Sandbox Code Playgroud)

但是,我在释放时遇到了麻烦pair。我已经尝试了delete recorddelete *recordrecord.reset(),但是这些都不会编译。

c++ memory-management smart-pointers shared-ptr

2
推荐指数
1
解决办法
5538
查看次数