使用Slick,您可以执行以下操作以从表中生成结果流:
val q = for (e <- events) yield e.name
val p: DatabasePublisher[String] = db.stream(q.result)
p.foreach { s => println(s"Event: $s") }
Run Code Online (Sandbox Code Playgroud)
这将打印events表中的所有事件并在最后一行之后终止.
假设您可以通过某种方式通知您何时在events表中输入新行,是否可以编写一个在插入事件时连续输出事件的流?一种tail -f用于DB表的.
我认为Slick本身不支持这个,但我认为应该可以使用Akka流媒体来提供帮助.因此,如果您可以从Slick Source获取某些东西,直到它为空,那么等待一个事件来指示表中的更多数据,然后流式传输新数据.可能通过使用一个ActorPublisher绑定这个逻辑?
只是想知道某人是否有这方面的经验或任何建议?
我有以下简单的案例类层次结构:
sealed trait Message
case class Foo(bar: Int) extends Message
case class Baz(qux: String) extends Message
Run Code Online (Sandbox Code Playgroud)
我有一个Flow[Message, Message, NotUsed](来自基于Websocket的协议,已经有编解码器).
我想将它解复用Flow[Message]为Foo和Baz类型的单独流程,因为它们由完全不同的路径处理.
这样做最简单的方法是什么?应该是显而易见的,但我遗漏了一些东西......
开始使用akka-streams我想构建一个简单的例子.在使用Web套接字插件的chrome中,我只需连接到这样的流https://blockchain.info/api/api_websocket via wss://ws.blockchain.info/inv和发送2命令
{"op":"ping"}{"op":"unconfirmed_sub"}
将在chromes web socket插件窗口中传输结果.我试图在akka流中实现相同的功能,但我遇到了一些问题:
遵循http://doc.akka.io/docs/akka/2.4.7/scala/http/client-side/websocket-support.html或http://doc.akka.io/docs/akka的教程时-http/10.0.0/scala/http/client-side/websocket-support.html#half-closed-client-websockets 以下是我的改编:
object SingleWebSocketRequest extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
}
val commandMessages = Seq(TextMessage("{\"op\":\"ping\"}"), TextMessage("{\"op\":\"unconfirmed_sub\"}"))
val helloSource: Source[Message, NotUsed] = Source(commandMessages.to[scala.collection.immutable.Seq])
// the Future[Done] is the materialized value of Sink.foreach
// and it …Run Code Online (Sandbox Code Playgroud) 我需要通过HTTP启动一些内容的下载,然后将数据作为反应流来读取.
因此,即使下载的数据很大,我几乎可以立即读取响应体的前几个字节(无需等待整个响应体).然后,做一些计算,并在几秒钟内读取另一部分数据.缓存数据必须有一些限制,因为操作内存无法处理整个内容(数十GB).
我一直在尝试使用HttpClient的sendAsync方法BodyHandlers.ofInputStream(),但它总是阻塞并等待所有数据到达.
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://..."))
.build();
HttpResponse<InputStream> response = client
.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream())
.get(); // this finishes as soon as the header is received
try {
InputStream stream = response.body();
byte[] test = stream.readNBytes(20); // trying to read just a few bytes
// but it waits for the whole body
} catch (IOException ex) {}
Run Code Online (Sandbox Code Playgroud)
我需要更改什么才能逐渐下载响应正文?
Monix的作者说将Monix与FS2进行比较
FS2更好的地方:
- 生产者和消费者之间的沟通模型是基于拉动的,有时使实施新运营商变得更加容易
Monix更好的地方:
- 生产者和消费者之间的沟通模型是基于推送的(带有背压),这使其本质上更加有效
很少有问题出现:
我有以下异步任务:
public class AsyncValidationTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
public Mono<Void> execute(Object o);
}
Run Code Online (Sandbox Code Playgroud)
public class AsyncSaveTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono of Object
public Mono<Object> execute(Object o);
}
Run Code Online (Sandbox Code Playgroud)
和下面的服务类:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
// Right now, the problem is that when validation completes successfully, it
// emits Mono.empty hence the flatMap chained below will not be …Run Code Online (Sandbox Code Playgroud) 我一直在玩Java Flow操作系统,offer但在阅读完文档后做了我的测试我不明白.
在这里我的测试
@Test
public void offer() throws InterruptedException {
//Create Publisher for expected items Strings
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//Register Subscriber
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.subscribe(new CustomSubscriber<>());
publisher.offer("item", (subscriber, value) -> false);
Thread.sleep(500);
}
Run Code Online (Sandbox Code Playgroud)
offer服务器接收一个要发出的项和一个BiPredicate函数,据我所知阅读文档,只有在谓词函数为true的情况下才会发出它.
Bur通过测试结果是
Subscription done:
Subscription done:
Subscription done:
Got : item --> onNext() callback
Got : item --> onNext() callback
Got : item --> onNext() callback
Run Code Online (Sandbox Code Playgroud)
结果没有变化,如果不是假,我返回true.
任何人都可以向我解释一下这个算子好一点.
我正在研究 Flux 和 Mono,并在多线程环境中使用它们,并使用提供工作线程的 Schedular。
使用elastic、parallel 和newElastic 启动Schedular 有很多选项。
这是我使用的代码:
System.out.println("------ elastic --------- ");
Flux.range(1, 10)
.map(i -> i / 2)
.publishOn(Schedulers.elastic()).log()
.blockLast();
System.out.println("------ new elastic --------- ");
Flux.range(1, 10)
.map(i -> i / 2).log()
.publishOn(Schedulers.newElastic("my")).log()
.blockLast();
Run Code Online (Sandbox Code Playgroud)
并且他们都有相同的文档:
调度程序动态创建基于 ExecutorService 的 Workers 并缓存线程池,并在 Workers 关闭后重用它们。
创建线程池的最大数量没有限制。
未使用的线程池的默认生存时间为 60 秒,使用适当的工厂推送不同的值。
该调度程序不可重新启动。
这是他们两个的日志:
------ elastic ---------
[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (elastic-2) | onNext(0)
[ INFO] (elastic-2) | onNext(1)
[ INFO] (elastic-2) | onNext(1) …Run Code Online (Sandbox Code Playgroud) java multithreading scheduler project-reactor reactive-streams
我正在使用 Java 中的 mongo 反应式驱动程序和反应流库编写一个应用程序。
我有以下 DAO 代码:
@Override
public Flux<ContentVersion> findBySearch(String appKey, ContentVersionSearchRequest request, Pager pager) {
final var searchResultsPublisher = mongoClient.getDatabase(appKey)
.getCollection(COLLECTION_CONTENT_VERSION, ContentVersion.class)
.find(prepareSearchFilter(request))
.sort(orderBy(ascending(FIELD_VERSION_STATUS_ORDER), descending(FIELD_UPDATE_DATE)))
.skip(pager.getSkip())
.limit(pager.getMax());
return Flux.from(searchResultsPublisher);
}
Run Code Online (Sandbox Code Playgroud)
在junit测试中,我模拟MongoClient、MongoDatabase、MongoCollection,但最后MongoCollection返回一个FindPublisher,我不知道如何正确模拟它。
我已经通过模拟订阅方法成功编写了单元测试,如下所示。然而,这对我来说似乎不对。
@Mock
private MongoClient mongoClient;
@Mock
private MongoDatabase database;
@Mock
private MongoCollection<ContentVersion> collection;
@Mock
private FindPublisher<ContentVersion> findPublisher;
@Mock
private UpdateResult updateResult;
@InjectMocks
private ContentVersionDaoImpl contentVersionDao;
@BeforeEach
void initCommonMocks() {
when(mongoClient.getDatabase("ddpApp")).thenReturn(database);
when(database.getCollection(MongoConstants.COLLECTION_CONTENT_VERSION, ContentVersion.class)).thenReturn(collection);
when(collection.find(any(Bson.class))).thenReturn(findPublisher);
when(collection.find(any(Document.class))).thenReturn(findPublisher);
when(findPublisher.limit(anyInt())).thenReturn(findPublisher);
when(findPublisher.skip(anyInt())).thenReturn(findPublisher);
when(findPublisher.sort(any())).thenReturn(findPublisher);
}
@Test
void shouldFindBySearch() {
final var contentVersion1 = …Run Code Online (Sandbox Code Playgroud) 在博客文章Flight of the Flux 3中,作者建议将同步阻塞调用包装在Monowith a subscribeOncall 中,如文章中的以下代码片段所示:
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url ->
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}
Run Code Online (Sandbox Code Playgroud)
但在同一篇文章的前面,他们表明您可以使用它publishOn来确保在单独的线程上完成阻塞调用:
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Run Code Online (Sandbox Code Playgroud)
既然如此,为什么不直接betterFetchUrls …
reactive-streams ×10
java ×5
scala ×4
akka-stream ×3
akka ×2
fs2 ×1
http ×1
java-11 ×1
java-9 ×1
java-flow ×1
mongodb ×1
monix ×1
scheduler ×1
slick ×1
unit-testing ×1