我使用 Java 邮件 API 接收来自多个 Gmail 帐户的消息。不同的帐户由不同的线程处理,我使用 aLinkedBlockingQueue来存储电子邮件。但是,我不希望将同一电子邮件重复添加到Queue. 这是我到目前为止的代码:
public synchronized void readMail(){
try {
boolean alreadyAdded = false;
Folder inbox = store.getFolder("Inbox");
inbox.open(Folder.READ_ONLY);
Message [] received = inbox.getMessages();
if(messages.isEmpty()){
for(Message newMessage:received){
System.out.println("Queue empty, adding messages");
messages.put(newMessage);
}
}
else{
for(Message existingMessage:messages){
for(Message newMessage:received){
if (alreadyAdded == true)
break;
else{
if(existingMessage.getSubject().equals(newMessage.getSubject())){
alreadyAdded = true;
System.out.println("boolean changed to true, message "+newMessage.getSubject()+"won't be added");
}
else{
alreadyAdded = false;
System.out.println("Non-duplicate message "+newMessage.getSubject());
messages.put(newMessage);
}
}
}
}
} …Run Code Online (Sandbox Code Playgroud) 请快速澄清一下
我知道BlockingQueues线程安全.
这是否意味着我可以将一个阻止队列的单个引用传递给所有生产者,这些生产者可以毫不犹豫地丢弃事件以供单个消费者使用,并且什么都不会中断?
否则不得不产生多达20个BlockingQueues,可能有或没有定期更新,并以任何效率读取它们似乎是一项不可逾越的任务.
TL; 博士; 我需要知道是否有一个具有持久阻塞队列的库。
我有一个经典的生产者/消费者计划。他们共享一个LinkedBlockingQueue来共享数据,我BlockingQueue#take在消费者中使用方法,因为我需要他们永远活着等待新元素。
问题是我有很多数据,我不能丢失它们。即使在消费者停止之后,生产者也可以坚持生成一些数据。我正在考虑H2在达到某个阈值后实现我的 BlockingQueue ta 用于存储/获取数据。我的主要问题是我需要性能,我需要按照元素的创建顺序使用它们。
是否有持久阻塞队列的实现可以用于这样的事情?如果没有,对我实现这样的目标有什么建议吗?
我正在我的程序中使用阻塞队列实现.我想知道线程将等待一个元素出队多长时间.我的客户通过民意调查回复,我的服务器线程提供了消息.我的代码如下;
private BlockingQueue<Message> applicationResponses= new LinkedBlockingQueue<Message>();
Run Code Online (Sandbox Code Playgroud)
客户:
Message response = applicationResponses.take();
Run Code Online (Sandbox Code Playgroud)
服务器:
applicationResponses.offer(message);
Run Code Online (Sandbox Code Playgroud)
我的客户线程会永远等待吗?我想配置那个时间..(例如:1000ms)..这可能吗?
有没有人知道,当迭代C#BlockingCollection <>时,是否从集合中获取元素,就像BlockingCollection.Take()所做的那样?
BlockingCollection<int> q = new BlockingCollection<int>();
[...]
foreach(int i in q)
{
//does q still contain i?
}
Run Code Online (Sandbox Code Playgroud)
谢谢
编辑:当然我的意思是BlockingCollection,但由于某种原因,我的头脑中有了BlockingQueue并使用了它.
理想情况下,我想将阻塞队列添加到选择器,以便我可以阻止从套接字读取或直到阻塞队列中出现项目。
是否有一些更高级别的类似选择器的函数可以对这样的不同类型进行操作?
我可以采取一种俗气的方式,启动 2 个线程,并让每个线程单独阻塞,但如果有一个函数可以阻塞两种类型的对象,那就更干净了。
有没有一种方法可以获取每个被阻止的对象的监视器,并使用类似选择器的对象来阻止这两个对象?
我有一个与此类似的代码,它位于run()aRunnable和多个实例的内部方法中Runnable,
do{
try{
String contractNum=contractNums.take();
}catch(InterruptedException e){
logger.error(e.getMessage(), e);
}
}while(!("*".equals(contractNum)));
Run Code Online (Sandbox Code Playgroud)
哪里contractNums是BlockingQueue<String>多线程共享的。Runnables这个队列有单独的放置元素。
我不确定捕获后的后续步骤InterruptedException,我应该通过重新抛出 a 来终止该线程RuntimeException(因此我的while循环终止)还是尝试contractNum queue再次获取下一个元素并忽略InterruptedException?
我不确定是否InterruptedException被视为线程终止或将其保留在 while 循环中的致命条件。
请建议。
首先,为了防止不喜欢读到最后的人将问题标记为重复,我已经阅读了“生产者-消费者日志服务”和“关闭问题的不可靠方式” 。但它并没有完全回答问题,并且回答与书本相矛盾。
在书中提供了以下代码:
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
private static final int CAPACITY = 1000;
public LogWriter(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start() {
logger.start();
}
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
private class LoggerThread extends Thread {
private final PrintWriter writer;
public LoggerThread(Writer writer) {
this.writer = new PrintWriter(writer, true); // autoflush
}
public void run() {
try …Run Code Online (Sandbox Code Playgroud) 首先:我已经阅读了以下两个问题及其可能的解决方案:
我遇到的困境是我想使用自定义队列BlockingQueue,或者更确切地说,使用不同但特定的队列,即PriorityBlockingQueue使用自定义队列Comparator使用按优先级对队列进行排序的自定义
它ThreadPoolExecutor确实支持在其构造函数中自定义队列,但它不实现接口中的方法ScheduledExecutorService。所以我去找了子类ScheduledThreadPoolExecutor,但它不支持自定义队列并使用DelayedWorkQueue。
问题:
ScheduledThreadPoolExecutor因为为我自己的类创建构造函数不会做任何事情,因为ScheduledThreadPoolExecutor不接受自定义队列作为参数。ThreadPoolExecutor和实现,ScheduledThreadPoolExecutor因为它使用了许多没有修饰符声明的方法(例如canRunInCurrentState(boolean periodic),此调用调用的所有方法),这不允许我访问该方法,因为即使它的子类ThreadPoolExecutor,它不在同一个包中。我当前的实现如下所示:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.croemheld.tasks.PriorityTaskComparator;
public class ScheduledPriorityThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
private static final int INITIAL_QUEUE_SIZE = 10;
public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, …Run Code Online (Sandbox Code Playgroud)
我在 databricks 中有一个用例,其中必须对 URL 数据集进行 API 调用。该数据集大约有 100K 条记录。允许的最大并发数是 3。
我在 Scala 中实现并在 databricks 笔记本中运行。除了队列中待处理的一个元素之外,我觉得这里还缺少一些东西。
阻塞队列和线程池是解决这个问题的正确方法吗?
在下面的代码中,我进行了修改,而不是从数据集中读取,而是在 Seq 上进行采样。任何帮助/想法将不胜感激。
import java.time.LocalDateTime
import java.util.concurrent.{ArrayBlockingQueue,BlockingQueue}
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit;
var inpQueue:BlockingQueue[(Int, String)] = new ArrayBlockingQueue[(Int, String)](1)
val inpDS = Seq((1,"https://google.com/2X6barD"), (2,"https://google.com/3d9vCgW"), (3,"https://google.com/2M02Xz0"), (4,"https://google.com/2XOu2uL"), (5,"https://google.com/2AfBWF0"), (6,"https://google.com/36AEKsw"), (7,"https://google.com/3enBxz7"), (8,"https://google.com/36ABq0x"), (9,"https://google.com/2XBjmiF"), (10,"https://google.com/36Emlen"))
val pool = Executors.newFixedThreadPool(3)
var i = 0
inpDS.foreach{
ix => {
inpQueue.put(ix)
val t = new ConsumerAPIThread()
t.setName("MyThread-"+i+" ")
pool.execute(t)
}
i = i+1
}
println("Final Queue Size = " +inpQueue.size+"\n")
class …Run Code Online (Sandbox Code Playgroud) functional-programming scala blockingqueue apache-spark databricks
blockingqueue ×10
java ×8
concurrency ×4
apache-spark ×1
behavior ×1
c# ×1
databricks ×1
foreach ×1
jakarta-mail ×1
scala ×1
shutdown ×1
sockets ×1