我正在尝试编写一个多线程的Web爬虫.
我的主要入门类有以下代码:
ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
URL url = frontier.get();
if(url == null)
return;
exec.execute(new URLCrawler(this, url));
}
Run Code Online (Sandbox Code Playgroud)
URLCrawler获取指定的URL,解析HTML从中提取链接,并将看不见的链接安排回边界.
边界是未抓取的URL队列.问题是如何编写get()方法.如果队列为空,则应等待任何URLCrawlers完成,然后再次尝试.仅当队列为空且当前没有活动的URLCrawler时,它才应返回null.
我的第一个想法是使用AtomicInteger来计算当前工作URLCrawler的数量以及notifyAll()/ wait()调用的辅助对象.开始时每个爬虫都会增加当前工作URLCrawler的数量,并在退出时递减它,并通知对象它已完成.
但我读到notify()/ notifyAll()和wait()是一些不赞成做线程通信的方法.
我应该在这个工作模式中使用什么?它类似于M生产者和N个消费者,问题是如何处理生产者的匮乏.
从Javadoc我知道ConcurrentHashMap.replace是原子的,但是怎么样ConcurrentHashMap.put?我看到它们在源代码中的实现方式不同,但我无法弄清楚它们的区别.任何大师都会给出一些关于如何使用这两种方法的指导方针?
我试图了解java.util.concurrent包中的实用程序,并了解到我们可以在方法中成功完成任务之后向callable对象提交ExecutorService,该对象返回Future,其中填充了返回的值.callablecall()
我理解所有的callables都是使用多个线程并发执行的.
当我想看看ExecutorService批量任务执行有多少改进时,我想到了捕获时间.
以下是我试图执行的代码 -
package concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorExample {
private static Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
StringBuilder builder = new StringBuilder();
for(int i=0; i<5; i++) {
builder.append(i);
}
return builder.toString();
}
};
public static void main(String [] args) {
long start = System.currentTimeMillis();
ExecutorService …Run Code Online (Sandbox Code Playgroud) 我不明白这两者之间的区别:
AtomicReference<Integer> atomicReference = new AtomicReference<>(1);
Run Code Online (Sandbox Code Playgroud)
与
AtomicInteger atomicInteger = new AtomicInteger(1);
Run Code Online (Sandbox Code Playgroud)
有人一般可以说何时使用AtomicReference?希望可以有人帮帮我.谢谢.
假设我需要处理3种类型的请求:A,B和C,如下所示:
更一般地,类型的数量是N并且并发请求的数量是K <= N.
你将如何在Java中实现它java.util.concurrent?
根据文档,AtomicInteger.incrementAndGet()是原子的.但是,在下面的源代码中,如果另一个线程在"返回下一个"之前交错怎么办?那么"下一步"会不正确吗?
public final long incrementAndGet() {
for (;;) {
long current = get();
long next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
Run Code Online (Sandbox Code Playgroud) 我不明白为什么我必须executorService.shutdown()显式调用终止executorService。如果我不调用,shutdown()那么 JVM 不会自行终止。
我的程序有什么问题或者我缺少什么概念?
public class ExecutorServiceExample {
public static class Task1 implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Message from Task1 :"
+ Thread.currentThread().getName());
}
}
public static class Task2 implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Message from Task2 :"
+ Thread.currentThread().getName());
}
}
public static class Task3 implements Runnable …Run Code Online (Sandbox Code Playgroud) 一位同事从Log4J 2.3代码中指出了以下片段:
/**
* Called to determine if the configuration has changed.
*/
@Override
public void checkConfiguration() {
final long current = System.currentTimeMillis();
if (((counter.incrementAndGet() & MASK) == 0) && (current >= nextCheck)) {
LOCK.lock();
try {
nextCheck = current + intervalSeconds;
if (file.lastModified() > lastModified) {
lastModified = file.lastModified();
for (final ConfigurationListener listener : listeners) {
final Thread thread = new Thread(new ReconfigurationWorker(listener, reconfigurable));
thread.setDaemon(true);
thread.start();
}
}
} finally {
LOCK.unlock();
}
}
Run Code Online (Sandbox Code Playgroud)
哪里counter是一个 …
我有一个ThreadPoolExecutor,我向它提交任务.
private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1));
Run Code Online (Sandbox Code Playgroud)
此代码提交Runnable给ThreadPoolExecutor.
protected void waitAndSweep(final String symbol) {
runnable = new Runnable() {
public void run() { /* irrelevant code */ }
};
try {
Future<?> self = threadPoolExecutor.submit(runnable);
futures.add(self);
} catch (RejectedExecutionException re) {
/* this exception will be thrown when wait and sweep is called more than twice.
* threadPoolExecutor can have one running task and one waiting task.
*/
} …Run Code Online (Sandbox Code Playgroud) 我偶然发现了Java并发包中CyclicBarrier和Phaser实用程序之间的区别.
我知道CyclicBarrier允许一组线程等到所有线程到达特定点.Phaser也做同样的事情,但它支持多个阶段.我也明白,CyclicBarrier可以重复使用.我认为这个重用工具使其功能与Phaser相同.
考虑以下程序:
测试Phaser:
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) {
Phaser p = new Phaser(3);
Thread t1 = new Thread(() -> process(p), "T1");
Thread t2 = new Thread(() -> process(p), "T2");
Thread t3 = new Thread(() -> process(p), "T3");
t1.start();
t2.start();
t3.start();
}
private static void process(Phaser p) {
try {
System.out.println("Started Phase 1: "+Thread.currentThread().getName());
p.arriveAndAwaitAdvance();
System.out.println("Finished Phase 1: "+Thread.currentThread().getName());
System.out.println("Started Phase 2: "+Thread.currentThread().getName());
p.arriveAndAwaitAdvance();
System.out.println("Finished Phase 2: "+Thread.currentThread().getName());
} …Run Code Online (Sandbox Code Playgroud)