Ada*_*sky 6 camera image-processing kotlin kotlin-coroutines
我尝试使用 Kotlin 的Flow
类作为消息队列,将数据从生产者(相机)传输到在单独的协程上运行的一组工作人员(图像分析器)。
在我的例子中,生产者是一台相机,并且运行速度比工作人员快得多。应通过丢弃数据来处理背压,以便图像分析器始终对来自相机的最新图像进行操作。
使用通道时,此解决方案有效,但看起来很混乱,并且没有为我提供在相机和分析仪之间转换数据的简单方法(如flow.map
)。
class ImageAnalyzer<Result> {
fun analyze(image: Bitmap): Result {
// perform some work on the image and return a Result. This can take a long time.
}
}
class CameraAdapter {
private val imageChannel = Channel<Bitmap>(capacity = Channel.RENDEZVOUS)
private val imageReceiveMutex = Mutex()
// additional code to make this camera work and listen to lifecycle events of the enclosing activity.
protected fun sendImageToStream(image: CameraOutput) {
// use channel.offer to ensure the latest images are processed
runBlocking { imageChannel.offer(image) }
}
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy() {
runBlocking { imageChannel.close() }
}
/**
* Get the stream of images from the camera.
*/
fun getImageStream(): ReceiveChannel<Bitmap> = imageChannel
}
class ImageProcessor<Result>(workers: List<ImageAnalyzer<Result>>) {
private val analysisResults = Channel<Result>(capacity = Channel.RENDEZVOUS)
private val cancelMutex = Mutex()
var finished = false // this can be set elsewhere when enough images have been analyzed
fun subscribeTo(channel: ReceiveChannel<Bitmap>, processingCoroutineScope: CoroutineScope) {
// omit some checks to make sure this is not already subscribed
processingCoroutineScope.launch {
val workerScope = this
workers.forEachIndexed { index, worker ->
launch(Dispatchers.Default) {
startWorker(channel, workerScope, index, worker)
}
}
}
}
private suspend fun startWorker(
channel: ReceiveChannel<Bitmap>,
workerScope: CoroutineScope,
workerId: Int,
worker: ImageAnalyzer
) {
for (bitmap in channel) {
analysisResults.send(worker.analyze(bitmap))
cancelMutex.withLock {
if (finished && workerScope.isActive) {
workerScope.cancel()
}
}
}
}
}
class ExampleApplication : CoroutineScope {
private val cameraAdapter: CameraAdapter = ...
private val imageProcessor: ImageProcessor<Result> = ...
fun analyzeCameraStream() {
imageProcessor.subscribeTo(cameraAdapter.getImageStream())
}
}
Run Code Online (Sandbox Code Playgroud)
执行此操作的正确方法是什么?我想使用 aChannelFlow
而不是 aChannel
在相机和ImageProcessor
. 这将允许我flow.map
在将图像发送到分析器之前调用将元数据添加到图像中。然而,这样做时,每个 ImageAnalyzer 都会获取同一图像的副本,而不是并行处理不同的图像。是否可以使用 aFlow
作为消息队列而不是广播器?
我通过流程得到了这个!在整个序列中保持流程由通道支持非常重要,以便每个工作人员都能拾取独特的图像进行操作。我已经通过单元测试确认了此功能。
这是我为后代更新的代码:
class ImageAnalyzer<Result> {
fun analyze(image: Bitmap): Result {
// perform some work on the image and return a Result. This can take a long time.
}
}
class CameraAdapter {
private val imageStream = Channel<Bitmap>(capacity = Channel.RENDEZVOUS)
private val imageReceiveMutex = Mutex()
// additional code to make this camera work and listen to lifecycle events of the enclosing activity.
protected fun sendImageToStream(image: CameraOutput) {
// use channel.offer to enforce the drop back pressure strategy
runBlocking { imageChannel.offer(image) }
}
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy() {
runBlocking { imageChannel.close() }
}
/**
* Get the stream of images from the camera.
*/
fun getImageStream(): Flow<Bitmap> = imageChannel.receiveAsFlow()
}
class ImageProcessor<Result>(workers: List<ImageAnalyzer<Result>>) {
private val analysisResults = Channel<Result>(capacity = Channel.RENDEZVOUS)
private val cancelMutex = Mutex()
var finished = false // this can be set elsewhere when enough images have been analyzed
fun subscribeTo(flow: Flow<Bitmap>, processingCoroutineScope: CoroutineScope): Job {
// omit some checks to make sure this is not already subscribed
return processingCoroutineScope.launch {
val workerScope = this
workers.forEachIndexed { index, worker ->
launch(Dispatchers.Default) {
startWorker(flow, workerScope, index, worker)
}
}
}
}
private suspend fun startWorker(
flow: Flow<Bitmap>,
workerScope: CoroutineScope,
workerId: Int,
worker: ImageAnalyzer
) {
while (workerScope.isActive) {
flow.collect { bitmap ->
analysisResults.send(worker.analyze(bitmap))
cancelMutex.withLock {
if (finished && workerScope.isActive) {
workerScope.cancel()
}
}
}
}
}
fun getAnalysisResults(): Flow<Result> = analysisResults.receiveAsFlow()
}
class ExampleApplication : CoroutineScope {
private val cameraAdapter: CameraAdapter = ...
private val imageProcessor: ImageProcessor<Result> = ...
fun analyzeCameraStream() {
imageProcessor.subscribeTo(cameraAdapter.getImageStream())
}
}
Run Code Online (Sandbox Code Playgroud)
看来,只要流量有渠道支持,每个订阅者都会获得一个独特的图像。
归档时间: |
|
查看次数: |
3241 次 |
最近记录: |