我有一个无限的元素流,我想按id和聚合组进行分组,让我们说2秒,然后将它们发送到下游.这是一个不起作用的代码,但可以更好地解释我想要的东西:
Source
.tick(0 second, 50 millis, () => if (Random.nextBoolean) (1, s"A") else (2, s"B"))
.map { f => f() }
.groupBy(10, _._1)
// how to aggregate grouped elements here for two seconds?
.scan(Seq[String]()) { (x, y) => x ++ Seq(y._2) }
.to(Sink.foreach(println))
Run Code Online (Sandbox Code Playgroud)
期望的输出应该如下所示:
Seq(A, A, A, A, A)
Seq(B, B, B)
Seq(A, A)
Seq(B, B, B, B, B)
// and so on
Run Code Online (Sandbox Code Playgroud)
如何使用流实现此类功能?
我有一个分页的资源,我想与Monix递归使用它。我想要一个Observable,它将发出下载的元素并递归使用页面。这是一个简单的例子。当然不行。它发出第一页,然后是第一页+第二页,然后是第一+第二+第三页。我希望它先发出,然后发出第二,然后发出第三,依此类推。
object Main extends App {
sealed trait Event
case class Loaded(xs: Seq[String]) extends Event
// probably should just finish stream instead of this event
case object Done extends Event
// here is the problem
def consume(page: Int, size: Int):Observable[Event] = {
Observable.fromFuture(getPaginatedResource(page, size)).concatMap{ xs =>
if (xs.isEmpty) Observable.pure(Done)
else Observable.concat(Observable.pure(Loaded(xs)), consume(page + 1, size + 5))
}
}
def getPaginatedResource(page: Int, size: Int):Future[Seq[String]] = Future {
if (page * size > 100) Seq.empty
else 0 to size map (x …Run Code Online (Sandbox Code Playgroud) 我有一个循环,我在其中做一些工作并使用 发送结果Sender。这项工作需要时间,如果失败我需要重试。当我重试时,接收器可能已关闭,我的重试将是浪费时间。因此,我需要一种方法来检查是否Receiver可用而不发送消息。
在理想的世界中,我希望我的代码在伪代码中看起来像这样:
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
// do som stuff with rx and drop it after some time
rx.recv(...).await;
});
let mut attempts = 0;
loop {
if tx.is_closed() {
break;
}
if let Ok(result) = do_work().await {
attempts = 0;
let _ = tx.send(result).await;
} else {
if attempts >= 10 {
break;
} else {
attempts += 1;
continue;
}
}
};
Run Code Online (Sandbox Code Playgroud)
问题是Sender没有is_closed方法。它确实有pub fn …
我有这个动作:
package com.test;
import com.opensymphony.xwork2.Action;
public class TestAction implements Action{
private String simpleParam;
public String getSimpleParam() {
return simpleParam;
}
public void setSimpleParam(String simpleParam) {
this.simpleParam = simpleParam;
}
@Override
public String execute() throws Exception {
return SUCCESS;
}
}
Run Code Online (Sandbox Code Playgroud)
当它执行时,我想调用struts内的另一个动作(例如不是重定向)并传递给它simpleParam。第二个动作是:
package com.test;
import com.opensymphony.xwork2.Action;
public class HelloAction implements Action {
private String id;
private String result;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getResult() {
return result;
} …Run Code Online (Sandbox Code Playgroud) 我对Akka很新,并试图了解如何处理依赖注入.我遇到了这个考试,它引导我进入Akka扩展,现在我试图理解它与简单的静态方法调用有何不同.关于扩展的Akka教程部分有这样的例子:
public class CountExtensionImpl implements Extension {
private final AtomicLong counter = new AtomicLong(0);
public long increment() {
return counter.incrementAndGet();
}
}
public class CountExtension extends AbstractExtensionId<CountExtensionImpl> implements ExtensionIdProvider {
public static final CountExtension EXTENSION = new CountExtension();
private CountExtension() {
}
@Override
public CountExtensionImpl createExtension(ExtendedActorSystem system) {
return new CountExtensionImpl();
}
@Override
public ExtensionId<? extends Extension> lookup() {
return EXTENSION;
}
}
public class MyActor extends UntypedActor{
@Override
public void onReceive(Object message) throws Exception {
System.out.println(CountExtension.EXTENSION.get(getContext().system()).increment());
} …Run Code Online (Sandbox Code Playgroud) 我有一个带有泛型类型参数的特征。我想将实现此特征的不同对象放入一个集合中。对象有不同的类型参数。
当我这样做时,编译器告诉我需要指定泛型类型参数。实际上,我的情况不需要这种通用类型信息,因此某种通配符适合我。让我展示代码,因为它更好地表明了我的意图:
trait Test<T> {
fn test(&self) -> T;
}
struct Foo;
struct Bar;
impl Test<i64> for Foo {
fn test(&self) -> i64 {
println!("foo");
42
}
}
impl Test<String> for Bar {
fn test(&self) -> String {
println!("bar");
"".to_string()
}
}
fn main() {
// I'm not going to invoke test method which uses generic type parameter.
// So some kind of wildcard would work for me.
// But underscore is not wildcard and this does not compile.
let …Run Code Online (Sandbox Code Playgroud)