基于 Akka-HTTP 的这个简单代码:
val route =
pathPrefix("myapp") {
path("search") {
get {
//ref ! DoSomething("foo")
complete(HttpEntity(ContentTypes.`application/json`, /* content here from an actor */ ))
}
}
}
Run Code Online (Sandbox Code Playgroud)
如何从 Actor ( sender ! content)返回值?
我最近开始摆弄 akka 的 actor 和 http 模块。然而,我偶然发现了一个相当烦人的小怪癖,即创建单一演员。
下面是两个例子:
1)
我有一个内存缓存,我的服务非常小(它是一个应用程序),所以我真的很喜欢这个内存模型。我可以在 Map 中保存与用户相关的大多数信息(嗯,列表的映射,但仍然很容易推理结构),并且我没有得到 redis、geode 或 aerospike 的开销和复杂性。
唯一的问题是这个内存中的缓存可以被多个源修改,并且所述修改必须是同步的。我没有同步该结构的所有 3 个访问方法(例如,通过构建消息队列或实现锁),而是将结构及其访问方法包装到参与者中,构建消息队列,简单的接收->发送逻辑和如果规模扩大,将很容易用 DA actor 代替专用的内存数据库。
2)我有一个“服务”层,应该用于为各种作业调度参与者(访问数据库,访问内存缓存,使用数据进行计算并将结果传递给用户......等)。
将此服务层理解为某种“单例”,即某些功能的闭包,因为它不执行任何阻塞或 CPU/内存密集型操作,它只是简单地向下分配任务(例如,决定有多少个任务)。演员/线程/我们应该被创建以及请求应该去哪里)
然而,这件事需要:
a) 使两个对象成为单例参与者或
b) 使两个对象都成为实际的“对象”(如在 scala 对象表示法中指定单个命名单例,其函数在其范围内具有闭包)
b) 存在很多问题,即服务层必须将 Actor 系统“传递”给它(我不确定这是最佳实践),以便创建 Actor,而不是创建自己的 Actor “childrens” 它将使用全局参与者系统创建儿童,并且消息传递和监控逻辑将更加尴尬和不直观。另外,内存中的缓存不会具有内置消息队列的优势(我并不是说它很难实现,但这似乎是其中一种情况,人们会说“哦,高兴,这很好,我有演员,我不必花时间实现和测试此代码”)
a) 一般而言,似乎存在 akka 文档中记录不足且未建议的问题。我是说:
http://doc.akka.io/docs/akka/2.4/scala/cluster-singleton.html
看看这个狗屎,一半的文档警告不要使用它,它是它自己的依赖项,坦率地说,对于像我这样还没有涉足函数式和并发编程象牙塔的可怜的草皮来说,它很难阅读。
所以,嗯。你们中的任何人都可以向我解释为什么使用单例演员不好吗?如果他们不能成为演员,你如何设计单身人士?有没有什么方法可以设计出不会造成大量损坏的单例演员?整个“服务”模型是否具有“全局”服务,这些服务被调用而不是实例化“un akka like”?
我正在尝试使用reactivemongo-akkastream 0.12.1从Mongodb流式传输数据,并将结果返回到其中一个路由中的CSV流中(使用Akka-http)。我确实按照此处的示例实现了这一点:
它看起来工作正常。
我现在面临的唯一问题是如何将标题添加到输出 CSV 文件中。有任何想法吗?
谢谢
我想尝试下面的小例子:
object Webserver {
def main(args: Array[String]) {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val route =
path("hello") {
get {
redirect(Uri("https://google.com"), StatusCodes.PermanentRedirect)
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when …Run Code Online (Sandbox Code Playgroud) 我正在开发一个 API,使用 Akka HTTP 将多个文件上传到 S3。我目前正在使用该fileUploadAll指令,它将所有文件缓冲到磁盘。这限制了可以处理的文件大小。有替代方法吗?我还能如何处理多部分/表单数据请求?
我想使用 Akka HTTP创建对Facebook API的表单数据 http 请求。在 curl 中,请求示例如下所示:
curl \
-X POST \
"https://graph-video.facebook.com/v2.3/1533641336884006/videos" \
-F "access_token=XXXXXXX" \
-F "upload_phase=transfer" \
-F "start_offset=0" \
-F "upload_session_id=1564747013773438" \
-F "video_file_chunk=@chunk1.mp4"
Run Code Online (Sandbox Code Playgroud)
因此,我为请求有效负载表示创建了以下模型:
case class FBSingleChunkUpload(accessToken: String,
sessionId: String,
from: Long,
to: Long,
file: File) //file is always ~1Mb
Run Code Online (Sandbox Code Playgroud)
然后我将使用:
Http().cachedHostConnectionPoolHttps[String]("graph-video.facebook.com")
Run Code Online (Sandbox Code Playgroud)
但我不知道如何转换FBSingleChunkUpload为正确的HttpRequest:(
你能给我一个提示,如何使用 Akka HTTP 表示这种类型的请求?
我有一个普通的 Play 2.6 应用程序,无法处理超过 12 个并发连接。它还会影响 Play 2.5。
这是一个示例控制器:
public class TestController extends Controller {
public Result index() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ok("");
}
}
Run Code Online (Sandbox Code Playgroud)
使用 12 个并发连接进行测试:
ab -n 12 -c 12 http://localhost:9000/
Run Code Online (Sandbox Code Playgroud)
输出:
...
Concurrency Level: 12
Time taken for tests: 1.005 seconds
Complete requests: 12
...
Run Code Online (Sandbox Code Playgroud)
因此,所有 12 个并发请求都在 1 秒内响应,这是预期的。
使用 13 个并发连接进行测试:
ab -n 13 -c 13 http://localhost:9000/
Run Code Online (Sandbox Code Playgroud)
输出:
...
Concurrency Level: 13
Time taken for tests: …Run Code Online (Sandbox Code Playgroud) 我正在努力学习akka-http和研究他们的榜样
这是我的代码的样子
val system = ActorSystem.create("enterpriseSystem", ConfigFactory.load("application"))
val notifier = system.actorOf(Props[Notifier], "notifier")
Run Code Online (Sandbox Code Playgroud)
和通知者
class Notifier extends Actor with ActorLogging {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import scala.concurrent.ExecutionContext.Implicits.global
def receive = {
case CommunicateECFailure =>
log.info("notifying about EC Failure")
val responseFuture: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = "http://localhost:8080"))
responseFuture onComplete {
case response =>
log.info("response received {}", response)
log.info("notified about EC Failure")
}
}
Run Code Online (Sandbox Code Playgroud)
正如你所看到的,我ActorSystem为每一个Actor创作创造了新的东西,那是不是很糟糕 我在akka文档中读到你不应该有很多ActorSystems
我怎么能避免这种情况?在施工期间将其作为参数传递?
我写了一个简单的演员,下载网页并将此页面的正文发送给发件人.我使用Akka HTTP来构建HTTP请求并处理HTTP响应.这是我的代码:
class Downloader(uri: String) extends Actor {
import akka.pattern.pipe
import context.dispatcher
final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))
val http = Http(context.system)
http.singleRequest(HttpRequest(uri = uri)) pipeTo self
println(s"SENDING request to $uri")
def receive = {
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
println(s"HttpResponse: SUCCESS")
val body = entity.dataBytes.runFold(ByteString(""))(_ ++ _) map (bytes => bytes.decodeString(ByteString.UTF_8)) foreach println
sender() ! body
context.stop(self)
case HttpResponse(code, _, _, _) =>
println(s"HttpResponse: FAILURE")
context.stop(self)
}
}
Run Code Online (Sandbox Code Playgroud)
在主程序中,我创建了10个开始在构造函数中下载其网页的actor.
val system = ActorSystem("akkaHttpClient")
for (i <- 1 …Run Code Online (Sandbox Code Playgroud) 我正在尝试用Manning的Manning的“ Akka in Action”重写一个POC项目的scala示例。该项目是用于创建事件和购买票证的小型Http服务器。
我正在演员可以发送影片Optional<Event>给我的时刻RestApi。根据是否存在该值,我应该使用OKelse 完成调用NOT_FOUND。
在Scala中,代码段如下所示:
get {
// GET /events/:event
onSuccess(getEvent(event)) {
_.fold(complete(NotFound))(e => complete(OK, e))
}
}
Run Code Online (Sandbox Code Playgroud)
... where getEvent返回一个Option[Event](等于java的Optional<Event>)。这就是我用Java重写的方式:
get(() -> onSuccess(() -> getEvent(event), eventGetRoute()))
...
//and eventGetRoute() is a function:
private Function<Optional<Event>, Route> eventGetRoute() {
return maybeEvent -> maybeEvent.map(event -> complete(OK, event, Jackson.marshaller())).orElseGet(() -> complete(NOT_FOUND));
}
Run Code Online (Sandbox Code Playgroud)
无法编译:Bad return type in lambda expression: Route cannot be converted to RouteAdapter。较长的(第一个)complete返回a …