dan*_*car 16 java multithreading synchronization locking
我有一个生产者和许多消费者.
使用更简单的算法可以获得相同的结果吗?使用可重入锁嵌套同步块似乎有点不自然.你可能会注意到任何比赛条件吗?
更新:我找到的第二个解决方案是使用3个集合.一个缓存生产者结果,第二个是阻塞队列,第三个使用列表来跟踪正在进行的任务.再有点复杂.
我的代码版本
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
public class Main1 {
static class Token {
private int order;
private String value;
Token() {
}
Token(int o, String v) {
order = o;
value = v;
}
int getOrder() {
return order;
}
String getValue() {
return value;
}
}
private final static BlockingQueue<Token> queue = new ArrayBlockingQueue<Token>(10);
private final static ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
private final static ReentrantLock reentrantLock = new ReentrantLock();
private final static Token STOP_TOKEN = new Token();
private final static List<String> lockList = Collections.synchronizedList(new ArrayList<String>());
public static void main(String[] args) {
ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
producerExecutor.submit(new Runnable() {
public void run() {
Random random = new Random();
try {
for (int i = 1; i <= 100; i++) {
Token token = new Token(i, String.valueOf(random.nextInt(1)));
queue.put(token);
}
queue.put(STOP_TOKEN);
}catch(InterruptedException e){
e.printStackTrace();
}
}
});
ExecutorService consumerExecutor = Executors.newFixedThreadPool(10);
for(int i=1; i<=10;i++) {
// creating to many runnable would be inefficient because of this complex not thread safe object
final Object dependecy = new Object(); //new ComplexDependecy()
consumerExecutor.submit(new Runnable() {
public void run() {
while(true) {
try {
//not in order
Token token = queue.take();
if (token == STOP_TOKEN) {
queue.add(STOP_TOKEN);
return;
}
System.out.println("Task start" + Thread.currentThread().getId() + " order " + token.getOrder());
Random random = new Random();
Thread.sleep(random.nextInt(200)); //doLongRunningTask(dependecy)
lockList.remove(token.getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}});
}
}}
Run Code Online (Sandbox Code Playgroud)
您可以预先创建一组Runnables将选择传入的任务(令牌)并根据其顺序值将它们放入队列中.
正如评论中所指出的那样,不能保证具有不同值的令牌将始终并行执行(总而言之,您至少可以通过框中的nr个物理核心来限制).但是,保证具有相同顺序的令牌将按到达顺序执行.
示例代码:
/**
* Executor which ensures incoming tasks are executed in queues according to provided key (see {@link Task#getOrder()}).
*/
public class TasksOrderingExecutor {
public interface Task extends Runnable {
/**
* @return ordering value which will be used to sequence tasks with the same value.<br>
* Tasks with different ordering values <i>may</i> be executed in parallel, but not guaranteed to.
*/
String getOrder();
}
private static class Worker implements Runnable {
private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();
private volatile boolean stopped;
void schedule(Task task) {
tasks.add(task);
}
void stop() {
stopped = true;
}
@Override
public void run() {
while (!stopped) {
try {
Task task = tasks.take();
task.run();
} catch (InterruptedException ie) {
// perhaps, handle somehow
}
}
}
}
private final Worker[] workers;
private final ExecutorService executorService;
/**
* @param queuesNr nr of concurrent task queues
*/
public TasksOrderingExecutor(int queuesNr) {
Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
workers = new Worker[queuesNr];
for (int i = 0; i < queuesNr; i++) {
Worker worker = new Worker();
executorService.submit(worker);
workers[i] = worker;
}
}
public void submit(Task task) {
Worker worker = getWorker(task);
worker.schedule(task);
}
public void stop() {
for (Worker w : workers) w.stop();
executorService.shutdown();
}
private Worker getWorker(Task task) {
return workers[task.getOrder().hashCode() % workers.length];
}
}
Run Code Online (Sandbox Code Playgroud)
根据代码的性质,保证以串行方式处理具有相同值的令牌的唯一方法是等待STOP_TOKEN到达.
您需要单个生产者 - 单个消费者设置,消费者按其值收集和排序令牌(进入Multimap,比如说).
只有这样,您才能知道哪些令牌可以串行处理,哪些令牌可以并行处理.
无论如何,我建议你看看LMAX Disruptor,这是在线程之间共享数据的非常有效的方法.
它不像Executors那样受到同步开销的影响,因为它是无锁的(这可能会给你很好的性能优势,具体取决于你的数据处理的性质).
// single thread for processing as there will be only on consumer
Disruptor<InEvent> inboundDisruptor = new Disruptor<>(InEvent::new, 32, Executors.newSingleThreadExecutor());
// outbound disruptor that uses 3 threads for event processing
Disruptor<OutEvent> outboundDisruptor = new Disruptor<>(OutEvent::new, 32, Executors.newFixedThreadPool(3));
inboundDisruptor.handleEventsWith(new InEventHandler(outboundDisruptor));
// setup 3 event handlers, doing round robin consuming, effectively processing OutEvents in 3 threads
outboundDisruptor.handleEventsWith(new OutEventHandler(0, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(1, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(2, 3, new Object()));
inboundDisruptor.start();
outboundDisruptor.start();
// publisher code
for (int i = 0; i < 10; i++) {
inboundDisruptor.publishEvent(InEventTranslator.INSTANCE, new Token());
}
Run Code Online (Sandbox Code Playgroud)
入站破坏程序上的事件处理程序只是收集传入的令牌.收到STOP令牌后,它会将一系列令牌发布到出站扰乱器以进行进一步处理:
public class InEventHandler implements EventHandler<InEvent> {
private ListMultimap<String, Token> tokensByValue = ArrayListMultimap.create();
private Disruptor<OutEvent> outboundDisruptor;
public InEventHandler(Disruptor<OutEvent> outboundDisruptor) {
this.outboundDisruptor = outboundDisruptor;
}
@Override
public void onEvent(InEvent event, long sequence, boolean endOfBatch) throws Exception {
if (event.token == STOP_TOKEN) {
// publish indexed tokens to outbound disruptor for parallel processing
tokensByValue.asMap().entrySet().stream().forEach(entry -> outboundDisruptor.publishEvent(OutEventTranslator.INSTANCE, entry.getValue()));
} else {
tokensByValue.put(event.token.value, event.token);
}
}
}
Run Code Online (Sandbox Code Playgroud)
出站事件处理程序按顺序处理相同值的标记:
public class OutEventHandler implements EventHandler<OutEvent> {
private final long order;
private final long allHandlersCount;
private Object yourComplexDependency;
public OutEventHandler(long order, long allHandlersCount, Object yourComplexDependency) {
this.order = order;
this.allHandlersCount = allHandlersCount;
this.yourComplexDependency = yourComplexDependency;
}
@Override
public void onEvent(OutEvent event, long sequence, boolean endOfBatch) throws Exception {
if (sequence % allHandlersCount != order ) {
// round robin, do not consume every event to allow parallel processing
return;
}
for (Token token : event.tokensToProcessSerially) {
// do procesing of the token using your complex class
}
}
}
Run Code Online (Sandbox Code Playgroud)
其余所需的基础架构(Disruptor docs中描述的目的):
public class InEventTranslator implements EventTranslatorOneArg<InEvent, Token> {
public static final InEventTranslator INSTANCE = new InEventTranslator();
@Override
public void translateTo(InEvent event, long sequence, Token arg0) {
event.token = arg0;
}
}
public class OutEventTranslator implements EventTranslatorOneArg<OutEvent, Collection<Token>> {
public static final OutEventTranslator INSTANCE = new OutEventTranslator();
@Override
public void translateTo(OutEvent event, long sequence, Collection<Token> tokens) {
event.tokensToProcessSerially = tokens;
}
}
public class InEvent {
// Note that no synchronization is used here,
// even though the field is used among multiple threads.
// Memory barrier used by Disruptor guarantee changes are visible.
public Token token;
}
public class OutEvent {
// ... again, no locks.
public Collection<Token> tokensToProcessSerially;
}
public class Token {
String value;
}
Run Code Online (Sandbox Code Playgroud)
如果你有很多不同的令牌,那么最简单的解决方案是创建一个线程执行者一定数目(约2倍的核数),然后每个任务分配给它的令牌的哈希值决定的执行.
这样,具有相同令牌的所有任务将转到相同的执行器并按顺序执行,因为每个执行程序只有一个线程.
如果您对调度公平性有一些未说明的要求,那么通过让生产者线程在分发它们之前排队其请求(或阻止)来避免任何严重的不平衡是很容易的,直到每个执行者的未完成请求少于10个请求为止. .
| 归档时间: |
|
| 查看次数: |
2281 次 |
| 最近记录: |