我试图准确理解如何通过原子操作安全地管理共享指针。事实证明 VC11 (Visual studio 2012) 支持 C++11,因此可以允许 std::shared_ptr 上的读/写竞争。我想检查一下我是否理解了基础知识,然后询问有关 VC11 中 std::shared_ptr 上原子操作的实现细节。
std::shared_ptr<A> x, y, z;
x = std::make_shared<A>(args1);
y = std::make_shared<A>(args2);
Run Code Online (Sandbox Code Playgroud)
线程1
std::shared_ptr<A> temp = std::atomic_load(&y);
Run Code Online (Sandbox Code Playgroud)
线程2
std::atomic_store(&y, z);
Run Code Online (Sandbox Code Playgroud)
如果没有原子,竞争可能会导致temp最终状态损坏,或者线程 2 可能会删除原始 y 所指向的 A 实例,就像线程 1 试图复制并添加共享指针一样,这将使其指向一个“僵尸”对象。
我关于VC11中atomic_load和atomic_store的问题:
我注意到他们使用自旋锁来对全局变量执行测试和设置。所以我想知道:为什么不在shared_ptr本身的引用计数器的最高位上进行测试和设置?这样不同的shared_ptr上的锁就不会相互竞争。没有这样做有什么原因吗?
编辑: VS 的实现atomic_is_lock_free。这并不奇怪,因为它对所有事情都使用自旋锁。仍然想知道为什么他们不能让它使用共享指针实例特定的锁而不是全局锁。
template <class _Ty> inline
bool atomic_is_lock_free(const shared_ptr<_Ty> *)
{ // return true if atomic operations on shared_ptr<_Ty> are lock-free
return (false);
}
Run Code Online (Sandbox Code Playgroud) 我的Spark应用程序是用Scala编写的.我正在写几个UDAF,它们对"矢量化"数据执行计算,其中每个值都是一个恒定大小(比如16)的双精度数组,而不是一个标量值.计算基于每个元素进行.我的目标是让UDAF尽可能高效地执行.为此,我希望双打在内存中连续出现,并且看到Spark的代码gen + JVM的JIT编译器将通过SIMD指令进行计算.
但是,似乎以直接的文档方式编写UDAF并使用标准DataFrame功能导致Spark为其Aggregator和Row对象生成非常低效的布局.数组中的数据到达我的代码,类型为WrappedArray [Double].它是Object []数组的代理,包含16个盒装双打.这不仅占实际原始数据的大约6-8倍的内存量,而且还使得不可能使用SIMD指令,因为双精度本身不在连续的存储器位置中.
例如,一个执行"矢量化和"的简单UDAF具有如下所示的更新函数:
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val acc = buffer.getAs[WrappedArray[Double]](0)
if (!input.isNullAt(0)) {
val value = input.getAs[WrappedArray[Double]](0)
for (i <- 0 until N) {
acc(i) += value(i)
}
buffer(0) = acc
}
}
Run Code Online (Sandbox Code Playgroud)
这是一个操作,在一个写得很好的本机程序中,看起来像这样:
void update(double* acc, const double* input) {
for(size_t i = 0; i != N; ++i) {
acc[i] += input[i]
}
}
Run Code Online (Sandbox Code Playgroud)
我的UDAF发生了什么?据我所知,只是我需要最后buffer(0) = acc一行存在(或累加器不更新)这意味着正在复制数组内容.因此,它首先创建大小为N(acc和value)的新的两个对象数组,将原始的盒装双打复制到它们中,然后+ =创建一个新的盒装Double,其中包含每个元素的结果,将它们放回到acc中,然后是数组acc被复制回缓冲区(0)中的数组.
这简直太糟糕了.我还没有运行任何分析,但我完全期望第二个代码片段运行速度比这快20-50倍.
必须有更好的方法来做事.我刚才读过关于"Project Tungsten"的内容,显然Spark可以使用非托管内存缓冲区运行 - 我不是Spark开发人员,所以我甚至不知道我是否可以将这些功能用于我的UDAF,如果是这样的话?有没有其他方法来至少消除拳击和无用的阵列复制?
编辑:示例输入和输出如下 -
case class …Run Code Online (Sandbox Code Playgroud) 星火UDAFs需要您实现多种方法,特别是
def update(buffer: MutableAggregationBuffer, input: Row): Unit
和
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
假设我的测试中有一个UDAF X,4行(r0, r1, r2, r3)和两个聚合缓冲区A, B。我想看看这段代码产生了预期的结果:
X.update(A, r0)
X.update(A, r1)
X.update(B, r2)
X.update(B, r3)
X.merge(A, B)
X.evaluate(A)
Run Code Online (Sandbox Code Playgroud)
与仅使用一个缓冲区在4行中的每行上调用X.update相同:
X.update(A, r0)
X.update(A, r1)
X.update(A, r2)
X.update(A, r3)
X.evaluate(A)
Run Code Online (Sandbox Code Playgroud)
这样,可以测试两种方法的正确性。但是,我不知道如何编写这样的测试:用户代码似乎无法实例化的任何实现MutableAggregationBuffer。
如果仅从4行中创建一个DF,并尝试使用它groupBy().agg(...)来调用UDAF,Spark甚至不会尝试以这种特定方式合并它们-由于行数很少,因此不需要。
unit-testing scala user-defined-functions apache-spark apache-spark-sql