我对ExecutorService#shutdown方法的javadoc感到困惑.这些矛盾的陈述不是吗?
启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务.此方法不会等待先前提交的任务完成执行.使用awaitTermination来做到这一点.
如果它可以有序地关闭以前提交的任务,那么它怎么能等待它们完成执行呢?
我有一个工作单元流,我们称之为"工作项",按顺序处理(现在).我想通过多线程工作加快处理速度.
约束:这些工作项按特定顺序排列,在处理过程中订单不相关 - 但一旦处理完成,订单必须恢复.
像这样的东西:
|.|
|.|
|4|
|3|
|2| <- incoming queue
|1|
/ | \
2 1 3 <- worker threads
\ | /
|3|
|2| <- outgoing queue
|1|
Run Code Online (Sandbox Code Playgroud)
我想在Java中解决这个问题,最好不要使用Executor Services,Futures等,而是使用基本的并发方法,如wait(),notify()等.
原因是:我的工作项目非常小且细粒度,它们在每个约0.2毫秒内完成处理.所以我担心使用来自java.util.concurrent.*的东西可能会引入很多开销并减慢我的代码速度.
到目前为止我发现的例子都保留了处理过程中的顺序(这与我的情况无关),并且在处理后并不关心顺序(在我的情况下这是至关重要的).
java concurrency performance multithreading java.util.concurrent
我对Java中的代码重新排序和竞争条件有疑问.
假设我有以下代码,同时执行两个或多个线程workForThread():
public class Job {
private Lock lock = new ReentrantLock();
private int sharedObject = 1;
public void workForThread() {
lock.lock();
try {
sharedObject++;
} finally {
lock.unlock();
}
}
}
Run Code Online (Sandbox Code Playgroud)
JVM是否可能以错误的顺序执行此操作?例如,以下重新排序是否可能?:
sharedObject++;
lock.lock();
lock.unlock();
Run Code Online (Sandbox Code Playgroud)
或者是否保证锁不会被重新排序?
java concurrency multithreading thread-safety java.util.concurrent
我正在阅读ConcurrentHashMapJDK8中的源代码,请注意,该代码TreeBin使用“读写”锁来防止并发读写。
如果没有并发的写线程试图修改树结构,则读线程将通过TreeNodes。当“查找”操作完成时,读取线程可能会:
(1)lockState如果存在,则“ CAS” 和“取消”服务线程。
以下是源代码中的“ find()”方法。
final Node<K,V> find(int h, Object k) {
if (k != null) {
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
}
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> …Run Code Online (Sandbox Code Playgroud) 我想在两个对象之间实现一个简单的比较器,它的唯一要求是
.compare当且仅当对象相同时才返回 0 。会Comparator.comparing(System::identityHashCode)工作吗?还有其他方法吗?
动机: 我想构建一个集合,允许我将时间戳消息存储在线程安全集合中,该集合将支持诸如“获取时间戳位于 [a,b) 中的所有消息”之类的查询。
似乎番石榴TreeMultimap使用全局锁(编辑:如果用synchronizedSortedSetMultimap包装器包装),并且ConcurrentSkipListMap似乎每次只支持一个条目(它是一张地图,而不是多张地图)。所以我想只使用一组对:
ConcurrentSkipListSet<ImmutablePair<Float,Message>> db,
其中对按词法排序,首先按时间(使用Float.compareTo),然后按类似Comparator.nullsFirst(Comparator.comparing(System::identityHashCode)).
的nullsFirst是有只是让db.subSet(ImmutablePair.of(a,null), ImmutablePair.of(b,null))查询半开时间区间[a,b)中。
你明白为什么我关心比较器保持相同性:如果消息比较器对不相同的消息返回零,消息可能会被删除。
你也明白了为什么我不需要比较器的其他东西:它就在那里,所以我可以使用ConcurrentSkipListSet. 我当然不想强加于用户(好吧,只是我 :-) 为Message.
另一种可能的解决方案是使用 a ConcurrentSkipListMap<Float, Set<Message>>(带有线程安全的 Set<> 实例),但在内存方面似乎有点浪费,一旦消息被删除,我需要自己删除 emptySet 以节省内存。
编辑:正如一些人所指出的,identityHashCode 可能会产生冲突,实际上我现在已经确认在我的设置中存在这种冲突(大致相当于上面的 4K 集合,每个集合每个时间段填充 4K 消息)。这很可能是我看到一些消息丢失的原因。所以我现在比以往任何时候都更有兴趣找到某种方法来拥有一个真正尊重相同性的“不可知”比较运算符。实际上,64 位哈希值(而不是 identityHashCode 提供的 32 位值)可能就足够了。
java java.util.concurrent comparator skip-lists concurrentskiplistmap
所有!
我在LinkedBlockingQueue中发现了奇怪的代码:
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
Run Code Online (Sandbox Code Playgroud)
谁能解释为什么我们需要局部变量h?它对GC有什么帮助?
我正在寻找一个AtomicStampedReference和/或AtomicMarkableReference的例子,它可以帮助我理解这些类及其功能.我无法通过网络获得任何质量示例.
我可以考虑在垃圾收集中使用它们,但一个高质量的例子将帮助我更好地理解它们.
请告诉我有什么区别信号量用1和0初始化.如下:
public static Semaphore semOne = new Semaphore(1);
Run Code Online (Sandbox Code Playgroud)
和
public static Semaphore semZero = new Semaphore(0);
Run Code Online (Sandbox Code Playgroud) 今天我尝试了CompletableFutureJava 8中的"new" ,当我找不到runAsync(Callable)方法时发现自己很困惑.我可以自己做,如下图所示,但为什么这(对我来说非常明显和有用的实用方法)缺失?上午我失去了一些东西?
public static <T> CompletableFuture<T> asFuture(Callable<? extends T> callable, Executor executor) {
CompletableFuture<T> future = new CompletableFuture<>();
executor.execute(() -> {
try {
future.complete(callable.call());
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
return future;
}Run Code Online (Sandbox Code Playgroud) 我在我的多线程应用程序中使用了Java Executors,但我似乎无法弄清楚何时最好使用以下各种方法:
1.
ExecutorService executor=Executors.newFixedThreadPool(50);
executor.execute(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }
Run Code Online (Sandbox Code Playgroud)
2.
int Page_Count=200;
ExecutorService executor=Executors.newFixedThreadPool(50);
doneSignal=new CountDownLatch(Page_Count);
for (int i=0;i<Page_Count;i++) executor.execute(new A_Runner(doneSignal, ... some parameter ...));
doneSignal.await();
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }
Run Code Online (Sandbox Code Playgroud)
3.
int Executor_Count=30;
ThreadPoolExecutor executor=new ThreadPoolExecutor(Executor_Count,Executor_Count*2,1,TimeUnit.SECONDS,new LinkedBlockingQueue());
List<Future<String>> futures=new ArrayList<>(3330);
for (int i=0;i<50;i++) futures.add(executor.submit(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { executor.awaitTermination(1,TimeUnit.SECONDS); }
for (Future<String> future : futures)
{
String f=future.get();
// ...
}
Run Code Online (Sandbox Code Playgroud)
具体来说,在[2]中,如果我跳过doneSignal,那么它就像[1],那么doneSignal的用途是什么?
另外,在[3]中,如果我添加一个doneSignal怎么办?或者有可能吗?
我想知道的是:这些方法是否可以互换,或者是否存在我应该使用上述特定类型的某种情况?
java multithreading executorservice java.util.concurrent threadpoolexecutor
java ×10
concurrency ×4
comparator ×1
deadlock ×1
java-8 ×1
performance ×1
puzzle ×1
skip-lists ×1