我正在尝试使用open mp加速稀疏矩阵向量产品,代码如下:
void zAx(double * z, double * data, long * colind, long * row_ptr, double * x, int M){
long i, j, ckey;
int chunk = 1000;
//int * counts[8]={0};
#pragma omp parallel num_threads(8)
{
#pragma omp for private(ckey,j,i) schedule(static,chunk)
for (i=0; i<M; i++ ){
z[i]=0;
for (ckey=row_ptr[i]; ckey<row_ptr[i+1]; ckey++) {
j = colind[ckey];
z[i] += data[ckey]*x[j];
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
现在,这个代码运行正常,并产生正确的结果,但它只给我加速~30%.我已经检查过,线程都得到大约相同数量的非零元素(它们是),并且矩阵相当大(300,000 x 300,000),所以我希望开销不是唯一的问题.我也尝试使用不同的块大小和线程数运行,我得到了类似的性能.
还有什么我可以试着从中获得一点额外的速度吗?或者我显然做错了什么?
干杯.
编辑:刚刚注释掉'// int*counts [8] = {0}',因为它是计算工作分配的剩余部分.不需要
Edit2(更多细节):
好的,所以我定时拨打5000次循环并得到平均时间:
矩阵的大小为:303544x303544 …
这个问题在某种程度上是这里发布的问题的扩展: SWIG_SHARED_PTR macro with templated class 尽管这个问题可能完全无关。
基本设置是这样的:我试图让 SWIG 将模板化类包装为 shared_ptr。所以接口文件应该是这样的
%shared_ptr(template_instance)
%include template_class.cpp
%template(vector_instance) template_class<int>;
Run Code Online (Sandbox Code Playgroud)
现在的问题是,template_class有很多派生类,这导致swig中出现很多警告,然后构建错误。这些类不需要作为shared_ptr's处理,所以我宁愿忽略上面代码生成的警告。错误的解决方法似乎是:
%shared_ptr(template_derived1)
%shared_ptr(template_derived2)
.
.
.
%shared_ptr(template_derivedn)
%shared_ptr(template_instance)
%include template_class.cpp
%template(vector_instance) template_class<int>;
Run Code Online (Sandbox Code Playgroud)
这是有效的,但是是一团糟,我认为将所有内容都表示为 shared_ptr 肯定有一些缺点(它是什么?)。这附近有人吗?
编辑:使用特定示例更新
测试.h
class Base
{
int base_member;
};
class Derived : public Base
{
int derived_member;
};
Run Code Online (Sandbox Code Playgroud)
测试文件
%module test
%{
#include "test.h"
#include <boost/shared_ptr.hpp>
%}
%include <boost_shared_ptr.i>
%shared_ptr(Base)
%include test.h
Run Code Online (Sandbox Code Playgroud)
命令:
swig -python -c++ test.i
g++ -fPIC -I /usr/include/python2.7 -c test_wrap.cxx
Run Code Online (Sandbox Code Playgroud)
在这个精简的示例中,swig 调用给出警告,而 g++ …
我对 Flink 在事件时间加水印时如何处理后期元素感到有些困惑。
我的理解是,当 Flink 读取数据流时,水印时间会在看到任何事件时间比当前水印事件时间大的数据时进行。然后,任何覆盖时间严格小于水印的窗口都会被触发驱逐(假设没有延迟允许。
但是,以这个最小的例子为例:
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}
object EventTimeExample {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
case class ExampleType(time: Long, value: Long)
def main(args: Array[String]) {
// Set up environment
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Example S3 path
val simple = env.fromCollection(Seq(
ExampleType(1525132800000L, 1),
ExampleType(1525132800000L, 2) ,
ExampleType(1525132920000L, 3),
ExampleType(1525132800000L, 4)
))
.assignAscendingTimestamps(_.time)
val windows = simple
.windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply{
(window, iter, collector: Collector[(Long, Long, String)]) => {
collector.collect(window.getStart, …Run Code Online (Sandbox Code Playgroud)