小编Ram*_*gil的帖子

访问由Source.actorRef创建的akka​​流源的基础ActorRef

我正在尝试使用Source.actorRef方法来创建akka.stream.scaladsl.Source对象.形式的东西

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...
Run Code Online (Sandbox Code Playgroud)

我的问题是:如何将数据发送到基于ActorRef的Source对象

我假设向Source发送消息是一种形式

//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)
Run Code Online (Sandbox Code Playgroud)

但是weatherSource没有!操作员或tell方法.

文件是不是关于如何使用Source.actorRef太过描述,它只是说,你可以...

提前感谢您的审核和回复.

scala akka akka-stream

27
推荐指数
3
解决办法
7396
查看次数

融化Pandas Dataframe的上三角矩阵

给定以下形式的方形pandas DataFrame:

   a  b  c
a  1 .5 .3
b .5  1 .4
c .3 .4  1
Run Code Online (Sandbox Code Playgroud)

我怎么才能melt得到上三角形

 Row     Column    Value
  a        a       1
  a        b       .5 
  a        c       .3
  b        b       1
  b        c       .4
  c        c       1 

#Note the combination a,b is only listed once.  There is no b,a listing     
Run Code Online (Sandbox Code Playgroud)

我对一个习惯性的熊猫解决方案更感兴趣,一个自定义索引器很容易手工编写...提前感谢您的考虑和响应.

python numpy reshape melt pandas

22
推荐指数
2
解决办法
9572
查看次数

akka http:Akka溪流与演员建立休息服务

在akka http上创建一个包含60+ API的REST Web服务时.如何选择是否应该选择akka流或akka演员?在他的帖子中,乔斯展示了两种在akka http上创建API的方法,但他没有显示何时我应该选择一个而不是另一个.

scala akka akka-stream akka-http

20
推荐指数
2
解决办法
5692
查看次数

map和mapAsync之间的区别

任何人都可以解释我地图和mapAsync与AKKA流之间的区别吗?在文档中,据说

可以使用mapAsync或mapAsyncUnordered执行涉及外部非基于流的服务的流转换和副作用

为什么我们不能简单地在这里映射?我假设Flow,Source,Sink都是Monadic,因此map应该在这些性质的延迟中正常工作?

scala akka-stream

19
推荐指数
1
解决办法
9291
查看次数

如何为多个(10k - 100k)请求正确调用Akka HTTP客户端?

我正在尝试使用Akka HTTP 2.0-M2编写批量数据上传工具.但我正面临着akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error.

我试图隔离一个问题,这里的示例代码也失败了:

public class TestMaxRequests {
    private static final class Router extends HttpApp {
        @Override
        public Route createRoute() {
            return route(
                    path("test").route(
                            get(handleWith(ctx -> ctx.complete("OK")))
                    )
            );
        }
    }


    public static void main(String[] args) {
        ActorSystem actorSystem = ActorSystem.create();
        Materializer materializer = ActorMaterializer.create(actorSystem);

        Router router = new Router();
        router.bindRoute("127.0.0.1", 8082, actorSystem);

        LoggingAdapter log = Logging.getLogger(actorSystem, new Object());

        for (int i = 0; i < 100; i++) {
            final int reqNum …
Run Code Online (Sandbox Code Playgroud)

java akka reactive-streams akka-stream akka-http

13
推荐指数
1
解决办法
5013
查看次数

Akka-Stream实现比单线程实现慢

2015-10-30更新


基于Roland Kuhn Awnser:

Akka Streams在Actors之间使用异步消息传递来实现流处理阶段.在异步边界上传递数据有一个开销,你在这里看到:你的计算似乎只需要大约160ns(来自单线程测量),而流式解决方案每个元素需要大约1μs,这是由消息传递决定的.

另一个误解是说"流"意味着并行性:在你的代码中,所有计算都在一个Actor(映射阶段)中顺序运行,因此对原始单线程解决方案没有任何好处.

为了从Akka Streams提供的并行性中受益,您需要具有多个处理阶段,每个阶段都执行任务

每个元素1μs,另见文档.

我做了一些改变.我的代码现在看起来像:

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)

  val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)

  val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder …
Run Code Online (Sandbox Code Playgroud)

scala reactive-streams akka-stream

10
推荐指数
2
解决办法
2473
查看次数

如何在Slick中使用反应流来插入数据

Slick的文档中,使用Reactive Streams的示例仅用于读取数据作为DatabasePublisher的一种方式.但是,如果您希望根据插入率将数据库用作接收器和后端,会发生什么?

我找了等效的DatabaseSubscriber,但它不存在.所以问题是,如果我有一个来源,说:

val source = Source(0 to 100)

如何用Slick创建一个Sink,将这些值写入带有模式的表中:

create table NumberTable (value INT)

mysql slick reactive-streams akka-stream slick-3.0

10
推荐指数
2
解决办法
1712
查看次数

Scala的部分函数是否具有Java等价物?

Scala具有部分函数,这些函数仅适用于输入类型的某些值,但不是全部:

val isEven: PartialFunction[Int, String] = {
  case x if x % 2 == 0 => x+" is even"
} 

assert(isEven(10) equalsIgnoreCase "10 is even")
assert(isEven.isDefinedAt(11) == false)
Run Code Online (Sandbox Code Playgroud)

而且,更有用的是,scala允许将"partialness"应用于a的子类型trait:

sealed trait BaseTrait

case class Foo(i : Int) extends BaseTrait
case class Bar(s : String) extends BaseTrait

val fooPartialFunc : PartialFunction[BaseTrait, Int] = {
  case f : Foo => 42 + f.i
}

assert(fooPartialFunc(Foo(8)) == 50)
assert(fooPartialFunc.isDefinedAt(Bar("hello")) == false)
Run Code Online (Sandbox Code Playgroud)

java 8中的等价物是什么?

大多数谷歌搜索结果混淆了"部分功能"与currying,例如"部分应用功能".

java scala java-8

10
推荐指数
2
解决办法
488
查看次数

在 Python 中创建带零的 UUID

在 python 中,如何创建全零值的 UUID?

对于java有一个类似的问题,这个问题是针对python的。

python uuid zero python-3.x

10
推荐指数
2
解决办法
5537
查看次数

来自Typesafe配置的案例类实例化

假设我有一个scala案例类,可以序列化为json(使用json4s或其他一些库):

case class Weather(zip : String, temp : Double, isRaining : Boolean)
Run Code Online (Sandbox Code Playgroud)

如果我正在使用HOCON配置文件:

allWeather {

   BeverlyHills {
    zip : 90210
    temp : 75.0
    isRaining : false
  }

  Cambridge {
    zip : 10013
    temp : 32.0
    isRainging : true
  }

}
Run Code Online (Sandbox Code Playgroud)

有没有办法使用typesafe配置来自动实例化Weather对象?

我正在寻找形式的东西

val config : Config = ConfigFactory.parseFile(new java.io.File("weather.conf"))

val bevHills : Weather = config.getObject("allWeather.BeverlyHills").as[Weather]
Run Code Online (Sandbox Code Playgroud)

该解决方案可以利用引用的值"allWeather.BeverlyHills"是json"blob" 的事实.

我显然可以编写自己的解析器:

def configToWeather(config : Config) = 
  Weather(config.getString("zip"), 
          config.getDouble("temp"), 
          config.getBoolean("isRaining"))

val bevHills = configToWeather(config.getConfig("allWeather.BeverlyHills"))
Run Code Online (Sandbox Code Playgroud)

但这似乎不够优雅,因为天气定义的任何变化也需要改变configToWeather …

scala typesafe-config hocon

9
推荐指数
2
解决办法
6556
查看次数