标签: reactive-streams

Scala Slick:永无止境

使用Slick,您可以执行以下操作以从表中生成结果流:

val q = for (e <- events) yield e.name
val p: DatabasePublisher[String] = db.stream(q.result)

p.foreach { s => println(s"Event: $s") }
Run Code Online (Sandbox Code Playgroud)

这将打印events表中的所有事件并在最后一行之后终止.

假设您可以通过某种方式通知您何时在events表中输入新行,是否可以编写一个在插入事件时连续输出事件的流?一种tail -f用于DB表的.

我认为Slick本身不支持这个,但我认为应该可以使用Akka流媒体来提供帮助.因此,如果您可以从Slick Source获取某些东西,直到它为空,那么等待一个事件来指示表中的更多数据,然后流式传输新数据.可能通过使用一个ActorPublisher绑定这个逻辑?

只是想知道某人是否有这方面的经验或任何建议?

scala slick reactive-streams akka-stream

6
推荐指数
1
解决办法
859
查看次数

Akka Streams按类型分割流

我有以下简单的案例类层次结构:

sealed trait Message
case class Foo(bar: Int) extends Message
case class Baz(qux: String) extends Message
Run Code Online (Sandbox Code Playgroud)

我有一个Flow[Message, Message, NotUsed](来自基于Websocket的协议,已经有编解码器).

我想将它解复用Flow[Message]为Foo和Baz类型的单独流程,因为它们由完全不同的路径处理.

这样做最简单的方法是什么?应该是显而易见的,但我遗漏了一些东西......

scala akka reactive-streams akka-stream

6
推荐指数
1
解决办法
1287
查看次数

akka流消耗web套接字

开始使用akka-streams我想构建一个简单的例子.在使用Web套接字插件的chrome中,我只需连接到这样的流https://blockchain.info/api/api_websocket via wss://ws.blockchain.info/inv和发送2命令

  • {"op":"ping"}
  • {"op":"unconfirmed_sub"} 将在chromes web socket插件窗口中传输结果.

我试图在akka流中实现相同的功能,但我遇到了一些问题:

  • 执行了2个命令,但实际上我没有获得流输出
  • 相同的命令执行两次(ping命令)

遵循http://doc.akka.io/docs/akka/2.4.7/scala/http/client-side/websocket-support.htmlhttp://doc.akka.io/docs/akka的教程时-http/10.0.0/scala/http/client-side/websocket-support.html#half-closed-client-websockets 以下是我的改编:

object SingleWebSocketRequest extends App {

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

import system.dispatcher

// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
    Sink.foreach {
      case message: TextMessage.Strict =>
        println(message.text)
    }

      val commandMessages = Seq(TextMessage("{\"op\":\"ping\"}"), TextMessage("{\"op\":\"unconfirmed_sub\"}"))
      val helloSource: Source[Message, NotUsed] = Source(commandMessages.to[scala.collection.immutable.Seq])

      // the Future[Done] is the materialized value of Sink.foreach
      // and it …
Run Code Online (Sandbox Code Playgroud)

scala akka reactive-streams akka-stream

6
推荐指数
1
解决办法
1369
查看次数

如何反应下载和处理大数据?

我需要通过HTTP启动一些内容的下载,然后将数据作为反应流来读取.

因此,即使下载的数据很大,我几乎可以立即读取响应体的前几个字节(无需等待整个响应体).然后,做一些计算,并在几秒钟内读取另一部分数据.缓存数据必须有一些限制,因为操作内存无法处理整个内容(数十GB).

我一直在尝试使用HttpClientsendAsync方法BodyHandlers.ofInputStream(),但它总是阻塞并等待所有数据到达.

HttpClient client = HttpClient.newHttpClient();

HttpRequest request = HttpRequest.newBuilder()
    .uri(URI.create("https://..."))
    .build();

HttpResponse<InputStream> response = client
    .sendAsync(request, HttpResponse.BodyHandlers.ofInputStream())
    .get(); // this finishes as soon as the header is received

try {
    InputStream stream = response.body();
    byte[] test = stream.readNBytes(20); // trying to read just a few bytes
                                         // but it waits for the whole body
} catch (IOException ex) {}
Run Code Online (Sandbox Code Playgroud)

我需要更改什么才能逐渐下载响应正文?

java http reactive-streams java-http-client java-11

6
推荐指数
1
解决办法
82
查看次数

流库中的拉模型与推模型中更容易实现哪些操作(反之亦然)?

Monix的作者说将Monix与FS2进行比较

FS2更好的地方:

  • 生产者和消费者之间的沟通模型是基于拉动的,有时使实施新运营商变得更加容易

Monix更好的地方:

  • 生产者和消费者之间的沟通模型是基于推送的(带有背压),这使其本质上更加有效

很少有问题出现:

  • 在基于拉的模型中,哪些操作更容易实现?
  • 是否存在更难通过这种方式实现的操作?
  • 为什么基于拉的方法本质上比较慢?

scala reactive-streams monix fs2

6
推荐指数
0
解决办法
219
查看次数

Java reactor - 将 Mono&lt;Void&gt; 与另一个产生 Mono&lt;Object&gt; 的异步任务链接起来

我有以下异步任务:

public class AsyncValidationTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
    public Mono<Void> execute(Object o);
}
Run Code Online (Sandbox Code Playgroud)
public class AsyncSaveTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono of Object
    public Mono<Object> execute(Object o);
}
Run Code Online (Sandbox Code Playgroud)

和下面的服务类:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   // Right now, the problem is that when validation completes successfully, it 
                   // emits Mono.empty hence the flatMap chained below will not be …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor reactive-streams

6
推荐指数
1
解决办法
4174
查看次数

Java 9 Flow SubmissionPublisher提供方法的行为

我一直在玩Java Flow操作系统,offer但在阅读完文档后做了我的测试我不明白.

在这里我的测试

@Test
public void offer() throws InterruptedException {
    //Create Publisher for expected items Strings
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    //Register Subscriber
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.subscribe(new CustomSubscriber<>());
    publisher.offer("item", (subscriber, value) -> false);
    Thread.sleep(500);
}
Run Code Online (Sandbox Code Playgroud)

offer服务器接收一个要发出的项和一个BiPredicate函数,据我所知阅读文档,只有在谓词函数为true的情况下才会发出它.

Bur通过测试结果是

Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback
Run Code Online (Sandbox Code Playgroud)

结果没有变化,如果不是假,我返回true.

任何人都可以向我解释一下这个算子好一点.

java reactive-streams java-9 java-flow

5
推荐指数
1
解决办法
546
查看次数

Schedulers.newElastic 和 Schedulers.elastic 方法之间有什么区别?

我正在研究 Flux 和 Mono,并在多线程环境中使用它们,并使用提供工作线程的 Schedular。

使用elastic、parallel 和newElastic 启动Schedular 有很多选项。

这是我使用的代码:

    System.out.println("------ elastic ---------  ");
    Flux.range(1, 10)
      .map(i -> i / 2)
      .publishOn(Schedulers.elastic()).log()
      .blockLast();
    
    System.out.println("------ new elastic ---------  ");
    Flux.range(1, 10)
      .map(i -> i / 2).log()
      .publishOn(Schedulers.newElastic("my")).log()
      .blockLast();
Run Code Online (Sandbox Code Playgroud)

并且他们都有相同的文档:

调度程序动态创建基于 ExecutorService 的 Workers 并缓存线程池,并在 Workers 关闭后重用它们。

创建线程池的最大数量没有限制。

未使用的线程池的默认生存时间为 60 秒,使用适当的工厂推送不同的值。

该调度程序不可重新启动。

这是他们两个的日志:

------ elastic ---------  
[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (elastic-2) | onNext(0)
[ INFO] (elastic-2) | onNext(1)
[ INFO] (elastic-2) | onNext(1) …
Run Code Online (Sandbox Code Playgroud)

java multithreading scheduler project-reactor reactive-streams

5
推荐指数
1
解决办法
7508
查看次数

如何在反应式 mongo 驱动程序中模拟 FindPublisher

我正在使用 Java 中的 mongo 反应式驱动程序和反应流库编写一个应用程序。

我有以下 DAO 代码:

@Override
public Flux<ContentVersion> findBySearch(String appKey, ContentVersionSearchRequest request, Pager pager) {
    final var searchResultsPublisher = mongoClient.getDatabase(appKey)
            .getCollection(COLLECTION_CONTENT_VERSION, ContentVersion.class)
            .find(prepareSearchFilter(request))
            .sort(orderBy(ascending(FIELD_VERSION_STATUS_ORDER), descending(FIELD_UPDATE_DATE)))
            .skip(pager.getSkip())
            .limit(pager.getMax());
    return Flux.from(searchResultsPublisher);
}
Run Code Online (Sandbox Code Playgroud)

在junit测试中,我模拟MongoClient、MongoDatabase、MongoCollection,但最后MongoCollection返回一个FindPublisher,我不知道如何正确模拟它。

我已经通过模拟订阅方法成功编写了单元测试,如下所示。然而,这对我来说似乎不对。

@Mock
private MongoClient mongoClient;

@Mock
private MongoDatabase database;

@Mock
private MongoCollection<ContentVersion> collection;

@Mock
private FindPublisher<ContentVersion> findPublisher;

@Mock
private UpdateResult updateResult;

@InjectMocks
private ContentVersionDaoImpl contentVersionDao;

@BeforeEach
void initCommonMocks() {
    when(mongoClient.getDatabase("ddpApp")).thenReturn(database);
    when(database.getCollection(MongoConstants.COLLECTION_CONTENT_VERSION, ContentVersion.class)).thenReturn(collection);
    when(collection.find(any(Bson.class))).thenReturn(findPublisher);
    when(collection.find(any(Document.class))).thenReturn(findPublisher);
    when(findPublisher.limit(anyInt())).thenReturn(findPublisher);
    when(findPublisher.skip(anyInt())).thenReturn(findPublisher);
    when(findPublisher.sort(any())).thenReturn(findPublisher);
}

@Test
void shouldFindBySearch() {
    final var contentVersion1 = …
Run Code Online (Sandbox Code Playgroud)

java unit-testing mongodb reactive-streams

5
推荐指数
0
解决办法
1078
查看次数

在 Reactor 中进行阻塞调用时,publishOn 与 subscribeOn

在博客文章Flight of the Flux 3中,作者建议将同步阻塞调用包装在Monowith a subscribeOncall 中,如文章中的以下代码片段所示:

final Flux<String> betterFetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .flatMap(url -> 
             //wrap the blocking call in a Mono
             Mono.fromCallable(() -> blockingWebClient.get(url))
             //ensure that Mono is subscribed in an boundedElastic Worker
             .subscribeOn(Schedulers.boundedElastic())
    ); //each individual URL fetch runs in its own thread!
}
Run Code Online (Sandbox Code Playgroud)

但在同一篇文章的前面,他们表明您可以使用它publishOn来确保在单独的线程上完成阻塞调用:

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Run Code Online (Sandbox Code Playgroud)

既然如此,为什么不直接betterFetchUrls …

project-reactor reactive-streams

5
推荐指数
1
解决办法
3520
查看次数