SubmissionPublisher在提交时未调用onNext订阅者

use*_*531 4 java publish-subscribe java-9

每隔一段时间,我都会通过某个查询来检索推文。这些推文必须传递给计算和操纵这些推文的服务。因此,这些服务已订阅我的发布者。因此Publisher.hasSubscribers()返回true。但是Submit或offer函数不会调用我的订阅者的onNext。因此,作为“修复”,我循环浏览订户并自己调用它。但是事实并非如此。

这是我的发布者的构造函数。

 public TwitterStreamer(Executor executor, int maxBufferCapacity, long period, TimeUnit unit, String searchQuery){
    super(executor, maxBufferCapacity);
    this.searchQuery = searchQuery;
    scheduler = new ScheduledThreadPoolExecutor(1);
    this.tweetGetter = scheduler.scheduleAtFixedRate(
            () -> {
               List<String> tweets = getTweets(searchQuery);
               /* this.lastCall = LocalDateTime.now();
                for(Flow.Subscriber sub : this.getSubscribers()){
                    sub.onNext(tweets);
                }*/
               this.submit(tweets);
                if(tweets.size() >= 20) this.close();
            }, 0, period, unit);
}
Run Code Online (Sandbox Code Playgroud)

这是我的订户

    package myFlowAPI;

import Interfaces.IProcess;
import Services.LogToFileService;

import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;

public class MySubscriber implements Flow.Subscriber<List<String>> {
private Flow.Subscription subscription;
private AtomicInteger count;

private IProcess processor;

private String name;
private int DEMAND = 0;

public MySubscriber(String name, IProcess processor){
    this.name = name;
    this.processor = processor;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
}


@Override
public void onNext(List<String> item) {
    Object result = this.processor.process(item);
    this.readResult(result);

    switch (this.processor.getClass().getSimpleName()){
        case "CalculateTweetStatsService":
            if((Integer) result >= 20){
                this.subscription.cancel();
            }
            break;
    }
}

@Override
public void onError(Throwable throwable) {
    System.out.println("Error is thrown " + throwable.getMessage());
}

@Override
public void onComplete() {
    if(this.processor instanceof LogToFileService){
        ((LogToFileService) processor).closeResource();
    }
    System.out.println("complete");
}

private void readResult(Object result){
    System.out.println("Result of " + this.processor.getClass().getSimpleName() + " processor is " + result.toString());
}
}
Run Code Online (Sandbox Code Playgroud)

这是我订阅发布者的主要地点

public static void main(String[] args) {
    ScheduledExecutorService  executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

    String searchQuery;
    try{
       searchQuery = args[0] != null ? args[0] : "#capgemini50";
    }catch (ArrayIndexOutOfBoundsException ex){
        searchQuery = "#capgemini50";
    }

    TwitterStreamer streamer = new TwitterStreamer(executor, 5, 15L, SECONDS, searchQuery);

    MySubscriber subscriber1 = new MySubscriber("LogFileSubscriber", new LogToFileService("./tweetsLogger.txt"));
    MySubscriber subscriber2 = new MySubscriber("TotalTweetSubscriber",new CalculateTweetStatsService());
    streamer.subscribe(subscriber1);
    streamer.subscribe(subscriber2);

}
Run Code Online (Sandbox Code Playgroud)

man*_*uti 6

您需要订阅者明确请求数据,例如在订阅时(请参阅http://download.java.net/java/jdk9/docs/api/java/util/concurrent/Flow.Subscription.html#request-long-):

@Override
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
    this.subscription.request(1);
}
Run Code Online (Sandbox Code Playgroud)

在onNext()中处理以请求下一个项目时也是如此。

  • 我的英雄。非常感谢!!`@Override public void onNext(List &lt;String&gt; item){对象结果= this.processor.process(item); this.readResult(result); 切换(this.processor.getClass()。getSimpleName()){情况“ CalculateTweetStatsService”:if((Integer)result&gt; = 20){this.subscription.cancel(); } break; } this.subscription.request(1); }` (2认同)