小编Art*_*nko的帖子

阿卡流.分组,聚合一段时间并发出结果

我有一个无限的元素流,我想按id和聚合组进行分组,让我们说2秒,然后将它们发送到下游.这是一个不起作用的代码,但可以更好地解释我想要的东西:

  Source
    .tick(0 second, 50 millis, () => if (Random.nextBoolean) (1, s"A") else (2, s"B"))
    .map { f => f() }
    .groupBy(10, _._1)
    // how to aggregate grouped elements here for two seconds?
    .scan(Seq[String]()) { (x, y) => x ++ Seq(y._2) }
    .to(Sink.foreach(println))
Run Code Online (Sandbox Code Playgroud)

期望的输出应该如下所示:

Seq(A, A, A, A, A)
Seq(B, B, B)
Seq(A, A)
Seq(B, B, B, B, B)
// and so on
Run Code Online (Sandbox Code Playgroud)

如何使用流实现此类功能?

scala akka akka-stream

5
推荐指数
1
解决办法
2483
查看次数

如何在Scala中使用Monix消耗分页资源?

我有一个分页的资源,我想与Monix递归使用它。我想要一个Observable,它将发出下载的元素并递归使用页面。这是一个简单的例子。当然不行。它发出第一页,然后是第一页+第二页,然后是第一+第二+第三页。我希望它先发出,然后发出第二,然后发出第三,依此类推。

object Main extends App {

  sealed trait Event
  case class Loaded(xs: Seq[String]) extends Event
  // probably should just finish stream instead of this event
  case object Done extends Event

  // here is the problem
  def consume(page: Int, size: Int):Observable[Event] = {
    Observable.fromFuture(getPaginatedResource(page, size)).concatMap{ xs =>
      if (xs.isEmpty) Observable.pure(Done)
      else Observable.concat(Observable.pure(Loaded(xs)), consume(page + 1, size + 5))
    }
  }

  def getPaginatedResource(page: Int, size: Int):Future[Seq[String]] = Future {
    if (page * size > 100) Seq.empty
    else 0 to size map (x …
Run Code Online (Sandbox Code Playgroud)

scala monix

5
推荐指数
1
解决办法
150
查看次数

如何查找 tokio::sync::mpsc::Receiver 是否已关闭?

我有一个循环,我在其中做一些工作并使用 发送结果Sender。这项工作需要时间,如果失败我需要重试。当我重试时,接收器可能已关闭,我的重试将是浪费时间。因此,我需要一种方法来检查是否Receiver可用而不发送消息。

在理想的世界中,我希望我的代码在伪代码中看起来像这样:

let (tx, rx) = tokio::sync::mpsc::channel(1);

tokio::spawn(async move {
   // do som stuff with rx and drop it after some time
    rx.recv(...).await;
});

let mut attempts = 0;
loop {
    if tx.is_closed() {
       break;
    }
    if let Ok(result) = do_work().await {
        attempts = 0;
        let _ = tx.send(result).await;
    } else {
        if attempts >= 10 {
            break;
        } else {
            attempts += 1;
            continue;
        }
    }
};
Run Code Online (Sandbox Code Playgroud)

问题是Sender没有is_closed方法。它确实有pub fn …

future rust rust-tokio

5
推荐指数
1
解决办法
3263
查看次数

Struts2中如何将结果转发到另一个动作?

我有这个动作:

package com.test;

import com.opensymphony.xwork2.Action;

public class TestAction implements Action{
    private String simpleParam;

    public String getSimpleParam() {
        return simpleParam;
    }

    public void setSimpleParam(String simpleParam) {
        this.simpleParam = simpleParam;
    }

    @Override
    public String execute() throws Exception {
        return SUCCESS;
    }
}
Run Code Online (Sandbox Code Playgroud)

当它执行时,我想调用struts内的另一个动作(例如不是重定向)并传递给它simpleParam。第二个动作是:

package com.test;

import com.opensymphony.xwork2.Action;

public class HelloAction implements Action {
    private String id;
    private String result;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getResult() {
        return result;
    } …
Run Code Online (Sandbox Code Playgroud)

java struts2

4
推荐指数
1
解决办法
3万
查看次数

Akka扩展有什么好处?

我对Akka很新,并试图了解如何处理依赖注入.我遇到了这个考试,它引导我进入Akka扩展,现在我试图理解它与简单的静态方法调用有何不同.关于扩展的Akka教程部分有这样的例子:

public class CountExtensionImpl implements Extension {

    private final AtomicLong counter = new AtomicLong(0);

    public long increment() {
        return counter.incrementAndGet();
    }
}

public class CountExtension extends AbstractExtensionId<CountExtensionImpl> implements ExtensionIdProvider {

    public static final CountExtension EXTENSION = new CountExtension();

    private CountExtension() {

    }

    @Override
    public CountExtensionImpl createExtension(ExtendedActorSystem system) {
        return new CountExtensionImpl();
    }

    @Override
    public ExtensionId<? extends Extension> lookup() {
        return EXTENSION;
    }
}

public class MyActor extends UntypedActor{


    @Override
    public void onReceive(Object message) throws Exception {
        System.out.println(CountExtension.EXTENSION.get(getContext().system()).increment());

    } …
Run Code Online (Sandbox Code Playgroud)

java akka

2
推荐指数
1
解决办法
447
查看次数

是否有可能拥有动态通用特征的集合?

我有一个带有泛型类型参数的特征。我想将实现此特征的不同对象放入一个集合中。对象有不同的类型参数。

当我这样做时,编译器告诉我需要指定泛型类型参数。实际上,我的情况不需要这种通用类型信息,因此某种通配符适合我。让我展示代码,因为它更好地表明了我的意图:

trait Test<T> {
    fn test(&self) -> T;
}
struct Foo;
struct Bar;
impl Test<i64> for Foo {
    fn test(&self) -> i64 {
        println!("foo");
        42
    }
}
impl Test<String> for Bar {
    fn test(&self) -> String {
        println!("bar");
        "".to_string()
    }
}

fn main() {
    // I'm not going to invoke test method which uses generic type parameter.
    // So some kind of wildcard would work for me.
    // But underscore is not wildcard and this does not compile.
    let …
Run Code Online (Sandbox Code Playgroud)

rust

1
推荐指数
1
解决办法
850
查看次数

标签 统计

akka ×2

java ×2

rust ×2

scala ×2

akka-stream ×1

future ×1

monix ×1

rust-tokio ×1

struts2 ×1