我正在寻找一些看似简单的东西,一个带有"添加"和"排泄"的非阻塞版本的集合.像这样的东西:
List itemsToProcess = queue.addOrDrainAndAdd( item );
if ( itemsToProcess != null )
process( items );
Run Code Online (Sandbox Code Playgroud)
在我看来,如果我将这些作为单独的"offer"和"drainTo"调用,我可以提供两次调用,然后再进行第一次调用.我还需要一个像"while(!queue.offer(item))"这样的循环,这样在它耗尽后,报价就会起作用,我认为这也需要我检查排水是否返回一个空集合(因为两个可能会叫排水管).我天真的实现是这样的,但它似乎不是最佳的:
void addBatchItem( T item ) {
while ( !batch.offer( item ) ) {
List<T> batched = new ArrayList<>( batchSize );
batch.drainTo( batched );
process( batched );
}
}
Run Code Online (Sandbox Code Playgroud)
然后我想也许有更好的方法,我只是不知道.谢谢!
编辑:
好的,这是一个解决方案(基于ArrayBlockingQueue阻塞):
public void add( T batchItem ) {
while ( !batch.offer( batchItem ) ) {
flush();
}
}
public void flush() {
List<T> batched = new ArrayList<>( batchSize );
batch.drainTo( batched, batchSize );
if ( !batched.isEmpty() )
executor.execute( new PhasedRunnable( batched ) );
}
Run Code Online (Sandbox Code Playgroud)
我想我的问题是,上述是否比基于ConcurrentLinkedQueue的解决方案更适合此目的,因为后者需要为每个节点分配对象?
使用示例类:
public abstract class Batcher<T> {
private final int batchSize;
private ArrayBlockingQueue<T> batch;
private ExecutorService executor;
private final Phaser phaser = new Phaser( 1 );
public Batcher( int batchSize, ExecutorService executor ) {
this.batchSize = batchSize;
this.executor = executor;
this.batch = new ArrayBlockingQueue<>( batchSize );
}
public void add( T batchItem ) {
while ( !batch.offer( batchItem ) ) {
flush();
}
}
public void flush() {
List<T> batched = new ArrayList<>( batchSize );
batch.drainTo( batched, batchSize );
if ( !batched.isEmpty() )
executor.execute( new PhasedRunnable( batched ) );
}
public abstract void onFlush( List<T> batch );
public void awaitDone() {
phaser.arriveAndAwaitAdvance();
}
public void awaitDone( long duration, TimeUnit unit ) throws TimeoutException {
try {
phaser.awaitAdvanceInterruptibly( phaser.arrive(), duration, unit );
}
catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
}
}
private class PhasedRunnable implements Runnable {
private final List<T> batch;
private PhasedRunnable( List<T> batch ) {
this.batch = batch;
phaser.register();
}
@Override
public void run() {
try {
onFlush( batch );
}
finally {
phaser.arrive();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是一个简单的例子,一个更完整的例子可能是JPA实体更新或插入.另外,我希望可以同时调用#add.
@Test
public void testOddNumber() {
Batcher<Integer> batcher = new Batcher<Integer>( 10, executor ) {
@Override
public void onFlush( List<Integer> batch ) {
count.addAndGet( batch.size() );
}
};
for ( int at = 0; at != 21; ++at ) {
batcher.add( at );
}
batcher.flush();
batcher.awaitDone();
assertEquals( count.get(), 21 );
}
Run Code Online (Sandbox Code Playgroud)
看似简单,一个带有非阻塞但原子版"添加"和"排水"的集合
这实际上是不可能的.非阻塞算法(在1-CAS archs上)在单个内存地址上工作以实现原子性.因此,无需阻塞和原子地排空整个队列是不可能的.
基于您的编辑,我认为这可能是实现您所需要的最有效方式.
| 归档时间: |
|
| 查看次数: |
1737 次 |
| 最近记录: |