似乎Java Streams的并行化的核心是ForEachTask.理解其逻辑似乎对于获取预测针对Streams API编写的客户端代码的并发行为所必需的心智模型至关重要.然而,我发现我的预期与实际行为相矛盾.
作为参考,这是关键compute()方法(java/util/streams/ForEachOps.java:253):
public void compute() {
Spliterator<S> rightSplit = spliterator, leftSplit;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
boolean forkRight = false;
Sink<S> taskSink = sink;
ForEachTask<S, T> task = this;
while (!isShortCircuit || !taskSink.cancellationRequested()) {
if (sizeEstimate <= sizeThreshold ||
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
}
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
task.addToPendingCount(1); …Run Code Online (Sandbox Code Playgroud) 从CPU的开始,一般知道整数除法指令是昂贵的.我去看看今天有多糟糕,在拥有数十亿晶体管的CPU上.我发现硬件idiv指令对于常量除数的性能仍然比JIT编译器能够发出的代码差得多,后者不包含idiv指令.
为了在专用的微基准测试中实现这一点,我写了以下内容:
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@OperationsPerInvocation(MeasureDiv.ARRAY_SIZE)
@Warmup(iterations = 8, time = 500, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@State(Scope.Thread)
@Fork(1)
public class MeasureDiv
{
public static final int ARRAY_SIZE = 128;
public static final long DIVIDEND_BASE = 239520948509234807L;
static final int DIVISOR = 10;
final long[] input = new long[ARRAY_SIZE];
@Setup(Level.Iteration) public void setup() {
for (int i = 0; i < input.length; i++) {
input[i] = DIVISOR;
}
}
@Benchmark …Run Code Online (Sandbox Code Playgroud) 我刚刚开始学习Kotlin协同程序,并试图模拟一些长时间的API调用,并在UI上显示结果:
class MainActivity : AppCompatActivity() {
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
override
fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
this.setContentView(R.layout.activity_main)
val resultTV = findViewById(R.id.text) as TextView
val a = async(CommonPool) {
delay(1_000L)
6
}
val b = async(CommonPool) {
delay(1_000L)
7
}
launch(< NEED UI thread here >) {
val aVal = a.await()
val bVal = b.await()
resultTV.setText((aVal * bVal).toString())
}
}
}
Run Code Online (Sandbox Code Playgroud)
我不明白我怎么可能launch在main上下文中使用方法.
不幸的是,我无法找到任何关于为协同程序的官方教程提供某些特定线程的结果的信息.
我最近潜入Kotlin协同程序因为我使用了很多Google的库,所以大部分工作都是在Task类中完成的
目前我正在使用此扩展来暂停协程
suspend fun <T> awaitTask(task: Task<T>): T = suspendCoroutine { continuation ->
task.addOnCompleteListener { task ->
if (task.isSuccessful) {
continuation.resume(task.result)
} else {
continuation.resumeWithException(task.exception!!)
}
}
}
Run Code Online (Sandbox Code Playgroud)
但最近我见过这样的用法
suspend fun <T> awaitTask(task: Task<T>): T = suspendCoroutine { continuation ->
try {
val result = Tasks.await(task)
continuation.resume(result)
} catch (e: Exception) {
continuation.resumeWithException(e)
}
}
Run Code Online (Sandbox Code Playgroud)
有什么区别,哪一个是正确的?
UPD:第二个例子不起作用,idk为什么
我正在学习Kotlin协同程序.我读过这runBlocking是桥接同步和异步代码的方法.但是如果runBlocking停止UI线程,性能提升是多少?例如,我需要在Android中查询数据库:
val result: Int
get() = runBlocking { queryDatabase().await() }
private fun queryDatabase(): Deferred<Int> {
return async {
var cursor: Cursor? = null
var queryResult: Int = 0
val sqlQuery = "SELECT COUNT(ID) FROM TABLE..."
try {
cursor = getHelper().readableDatabase.query(sqlQuery)
cursor?.moveToFirst()
queryResult = cursor?.getInt(0) ?: 0
} catch (e: Exception) {
Log.e(TAG, e.localizedMessage)
} finally {
cursor?.close()
}
return@async queryResult
}
}
Run Code Online (Sandbox Code Playgroud)
查询数据库会停止主线程,所以它似乎需要与同步代码相同的时间?如果我错过了什么,请纠正我.
在阅读了CoroutineScope的介绍和javadoc后,我仍然有点混淆了背后的想法CoroutineScope是什么.
文档的第一句"定义新协程的范围".我不清楚:为什么我的协同程序需要一个范围?
另外,为什么单独的协同构建器被弃用?为什么这样做更好:
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
Run Code Online (Sandbox Code Playgroud)
代替
fun produceSquares(): ReceiveChannel<Int> = produce { //no longer an extension function
for (x in 1..5) send(x * x)
}
Run Code Online (Sandbox Code Playgroud) 我不清楚将文件分成多个部分然后单独下载每个部分的概念.根据我的说法,我们只有在互联网上存在的那个文件的路径,那么如何通过知道URL或路径来破解这个文件呢?
在研究分代垃圾收集器对应用程序性能的微妙后果时,我已经在一个非常基本的操作 - 一个简单的写入堆位置 - 的性能方面遇到了相当惊人的差异 - 关于所写的值是原始值还是引用.
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 3, time = 1)
@State(Scope.Thread)
@Threads(1)
@Fork(2)
public class Writing
{
static final int TARGET_SIZE = 1024;
static final int[] primitiveArray = new int[TARGET_SIZE];
static final Object[] referenceArray = new Object[TARGET_SIZE];
int val = 1;
@GenerateMicroBenchmark
public void fillPrimitiveArray() {
final int primitiveValue = val++;
for (int i = 0; i < TARGET_SIZE; i++)
primitiveArray[i] = primitiveValue;
}
@GenerateMicroBenchmark
public void fillReferenceArray() …Run Code Online (Sandbox Code Playgroud) 我编写了3个简单的程序来测试协程的线程性能优势.每个程序都进行了许多常见的简单计算.所有程序都是彼此分开运行的.除了执行时间,我通过Visual VMIDE插件测量CPU使用率.
第一个程序使用1000-threaded池进行所有计算.64326 ms由于频繁的上下文更改,这段代码显示了最差的结果()与其他结果相比:
val executor = Executors.newFixedThreadPool(1000)
time = generateSequence {
measureTimeMillis {
val comps = mutableListOf<Future<Int>>()
for (i in 1..1_000_000) {
comps += executor.submit<Int> { computation2(); 15 }
}
comps.map { it.get() }.sum()
}
}.take(100).sum()
println("Completed in $time ms")
executor.shutdownNow()
Run Code Online (Sandbox Code Playgroud)第二个程序具有相同的逻辑,但1000-threaded它不使用池,而是仅使用n-threaded池(其中n等于机器核心的数量).它显示了更好的结果(43939 ms)并且使用更少的线程,这也是好的.
val executor2 = Executors.newFixedThreadPool(4)
time = generateSequence {
measureTimeMillis {
val comps = mutableListOf<Future<Int>>()
for (i in 1..1_000_000) {
comps += executor2.submit<Int> …Run Code Online (Sandbox Code Playgroud)我正在经历共同惯例的概念,以及它在kotlin中的使用和实现.
我用Google搜索并阅读了几个答案,就其在架构和性能方面与线程的不同之处而言.
这里解释得很好,
很公平,合作例程很棒,没有内存开销,性能卓越,没有死锁,竞争条件等等,而且易于使用.
现在,这里有一些事情,我很困惑,并希望更清楚 -
协同例程很好用,但它如何利用多个内核来提高性能.
kotlin ×6
android ×4
java ×3
jmh ×2
benchmarking ×1
coroutine ×1
download ×1
io ×1
java-8 ×1
java-stream ×1
networking ×1