我有一项可以以很高的速率传输邮件的服务。
目前,它由akka-tcp提供服务,每分钟发送350万条消息。我决定尝试一下grpc。不幸的是,它导致吞吐量大大降低:每分钟约50万条消息甚至更低。
您能推荐如何优化它吗?
我的设定
硬件:32核,24Gb堆。
grpc版本:1.25.0
消息格式和端点
消息基本上是一个二进制blob。客户端将100K-1M和更多消息流传输到同一请求中(异步),服务器不响应任何内容,客户端使用无操作观察者
service MyService {
rpc send (stream MyMessage) returns (stream DummyResponse);
}
message MyMessage {
int64 someField = 1;
bytes payload = 2; //not huge
}
message DummyResponse {
}
Run Code Online (Sandbox Code Playgroud)
问题:与akka实施相比,消息速率低。我观察到CPU使用率较低,因此我怀疑grpc调用实际上在内部阻塞,尽管它另有说明。onNext()确实不会立即返回调用,但表上还存在GC。
我试图产生更多的发件人来缓解此问题,但并没有太大的改进。
我的发现 Grpc在序列化每个消息时实际上分配了一个8KB的字节缓冲区。请参阅堆栈跟踪:
java.lang.Thread.State:在com.google.common.io.ByteStreams.copy(ByteStreams.java:com.google.common.io.ByteStreams.createBuffer(ByteStreams.java:58)处处于阻塞状态(在对象监视器上): 105)在io.grpc.internal.MessageFramer.writeToOutputStream(MessageFramer.java:274)在io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:230)在io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java :168)位于io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:141)位于io.grpc.internal.AbstractStream.writeMessage(AbstractStream.java:53)位于io.grpc.internal.ForwardingClientStream.writeMessage(ForwardingClientStream。 io.grpc.internal.DelayedStream.writeMessage(DelayedStream.java:252)的io.grpc.internal的java:37)。io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:457)的io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)的io.grpc.internal.ClientCallImpl.sendMessageInternal(ClientCallImpl.java:473)io.grpc.ForwardingClientCall.sendMessage的(ForwardingClientCall.java:37)位于io.grpc.stub.ClientCalls $ CallToStreamObserverAdapter.onNext(ClientCalls.java:346)
对于构建高吞吐量grpc客户的最佳做法的任何帮助,均表示赞赏。
如今,flatMap是类似monad的对象上对应操作的最广泛使用的名称。但是我找不到它第一次出现在哪里以及什么流行。
我知道的最古老的外观是在Scala中。在Haskell中称为bind。在类别理论中,使用希腊表示法。
language-agnostic monads history functional-programming terminology
是否有可能以某种方式将解决方案扩展为总和类型?
sealed trait Group
case class A extends Group
case class B extends Group
case class C extends Group
def divide(l : List[Group]): //Something from what I can extract List[A], List[B] and List[C]
Run Code Online (Sandbox Code Playgroud) 我有一个曾经有很多数据的表,但由于 ttl 很久以前,这些数据已经死亡。
但是,当我运行诸如SELECT * FROM my_table LIMIT 10或SELECT count(*) FROM my_table几秒钟超时之类的查询时。
当我使用 spark-cassandra-connector(它在内部通过令牌范围查询表)读取数据时,实际上需要几分钟才能获得 0 个条目。
我怀疑压缩有问题,墓碑没有被删除,但表数据目录看起来并不大:
/var/lib/scylla$ sudo ls -la
/data/scylla/data/my_space/my_table-75f8388035c211e9bc85000000000011
总计 2604 drwx------ 4 scylla scylla 8192 十一月 27 日 15:55。
drwx------ 215 scylla scylla 16384 Nov 15 19:00 ..
-rw-r--r-- 1 scylla scylla 538 Nov 25 22:58 mc-1552402-big-CompressionInfo.db
-rw-r--r-- 1 scylla scylla 162869 Nov 25 22:58 mc-1552402-big-Data.db
-rw-r--r-- 1 scylla scylla 11 月 25 日 10 日 22:58 mc-1552402-big-Digest.crc32
-rw-r--r-- 1 scylla scylla …
外延公理说,如果两个函数在域的每个参数上的动作相等,则它们是相等的。
Axiom func_ext_dep : forall (A : Type) (B : A -> Type) (f g : forall x, B x),
(forall x, f x = g x) -> f = g.
Run Code Online (Sandbox Code Playgroud)
平等=的定理声明的两侧是命题平等(具有单个数据类型eq_refl构造函数)。使用这个公理可以证明f = a + b和g = b + a在命题上是相等的。
但是f和g显然不等于数据结构。
你能解释一下我在这里遗漏了什么吗?可能函数对象没有正常形式?
(1)我有一个资源密集型函数f,其调用导致大量内存分配。因为在f内部,将存在大量异议关系集合以构造到内存中。
(2)但是,在我的工作量中,我碰巧需要反复调用函数,例如在for循环和map函数中调用超过1K次。这样,调用f的循环会迅速关闭JVM。
for {
calling f here
}
Run Code Online (Sandbox Code Playgroud)
为了使上述工作负载正常工作,我在调用f之前引入Thread.sleep来引入间隔以延迟f调用的每次迭代,如下所示
for {
Thread.sleep (10)
calling f here
}
Run Code Online (Sandbox Code Playgroud)
这个经过时间的确会减少总内存使用量,以使工作量超过大型工作量。
(3)但是,有以下已知作用:(a)增加GC的频率,(b)增加总响应时间。因此,我需要根据客户端和服务器的超时配置进行调整。(c)延迟可以线性增长,当需要增加迭代时,延迟不会扩大。(d)如果有同时请求触发上述相同工作量的请求,则其他请求将超时。
我的问题:如何从字面上调用资源密集型功能:
(A)(2)以合理的响应时间同时处理上述一个大工作量的最佳方法是什么?(B)(2)同时处理多个大型工作负载的最佳方法是什么?