我们正在 kubernetes 上运行一个 5 节点的 flink 集群(1.6.3),具有 5 个分区的 Kafka 主题源。5 个作业正在读取该主题(具有不同的消费者群体),每个作业的并行度 = 5。
每个任务管理器运行 10Gb 内存,任务管理器堆大小限制为 2Gb。摄取负载相当小(每秒 100-200 条消息),平均消息大小约为 4-8kb。所有作业都运行了几个小时。一段时间后,我们突然看到一项或多项工作失败:
ava.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:666)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
Run Code Online (Sandbox Code Playgroud)
flink 重新启动作业,但它一直在该异常上失败。我们已经尝试按照此处的建议减少记录轮询: Kafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memory 我们还尝试按照此处的建议增加 kafka 堆大小: Flink + Kafka, java.lang.OutOfMemoryError when parallelism > …
apache-kafka kafka-consumer-api apache-flink flink-streaming
我的新工作场所大量使用函数式Java Either进行错误处理(http://www.functionaljava.org/javadoc/4.5/functionaljava/fj/data/Either.html)。
几乎根本没有使用异常。
出于多种原因,这是非常令人讨厌的imo。举例说明:方法内的每个api调用(返回Either)必须首先检查返回的Either是否是错误,然后再继续执行下一行。如果是错误,则以Either形式将其传播回方法调用堆栈中。堆栈中的每个方法都还需要检查重新调整的Either中的错误,直到最终我们到达负责处理错误的方法为止。这导致结构非常糟糕的Java代码。我真的不能编写正常的流Java代码,因为我必须在每个api调用上“停止”(因此流不成问题)。例如
Either<Error, Value> myMethod(String someVar) {
Either<Error, Value> valEither someService.getSomething(someVar)
if (valEither.isRight()) {
Either<Error, Value> otherValue someService.getSomethingElse(valEither.right().value())
if (otherValue.isRight()) ..... //you get the idea
} else {
//maybe log
return valEither;
}
}
Run Code Online (Sandbox Code Playgroud)
我当然可以使用Either的单调方法(我也这样做),但这不能解决必须“询问”返回类型(如果有错误或值)的核心问题。为什么我认为将Either用作错误处理基础结构并不是一个好主意,原因有很多(长分配语句,长方法,嵌套泛型等)。
要解决此问题,我想到了可能会为每个正在返回“错误”的api调用抛出特定异常,然后在当前正在使用的顶级方法上捕获一次异常。
Value val = myService.getSomething().rightOrThrow(new MyException("blaa"));
Run Code Online (Sandbox Code Playgroud)
但这似乎是不可能的,因为在Either投影类型上唯一能做类似事情的方法会抛出Java错误,这并不意味着(几乎)在任何情况下都会被捕获(我可能会偶然捕获到stackoverflow或内存不足错误)。
myService.getSomething().right().valueE("some error msg");
Run Code Online (Sandbox Code Playgroud)
有什么建议或想法吗?
谢谢
我正在尝试让Dropwizard与会议合作。我已经读过0.7,Dropwizard添加了会话支持。来自发行说明:“增加了对HTTP会话的支持。将带注释的参数添加到资源方法:@Session HttpSession会话中,以注入会话上下文。”
我有一个示例资源类,它从http get请求获取用户/密码,并且我想在会话中保存用户名:
@GET
public Response auth(@QueryParam("user") String user, @QueryParam("password") String password, @Session HttpSession session) throws URISyntaxException {
URI rootUri = getApplicationRootUri();
if(!user.equals(config.getUser()) || !password.equals(config.getPassword())) {
return Response.temporaryRedirect(rootUri).build();
}
session.setAttribute("user", user);
return Response.temporaryRedirect( new URI("/../index.html")).build();
}
Run Code Online (Sandbox Code Playgroud)
尝试从我的应用程序登录时,我得到:
ERROR [2014-05-17 10:33:45,244] com.sun.jersey.spi.container.ContainerRequest: A message body reader for Java class javax.servlet.http.HttpSession, and Java type interface javax.servlet.http.HttpSession, and MIME media type application/octet-stream was not found.
Run Code Online (Sandbox Code Playgroud)
我尝试从github上的dropwizard源中查找一些测试,似乎@Session注释的使用方式与我的代码相同。
有任何想法吗?
谢谢!
在阅读了flink的文档并四处搜寻之后,我无法完全理解flink的窗口中如何处理状态。可以说我有一个每小时运行的带有聚合函数的滚动窗口,该函数将msg累积到某些Java pojo或scala case类中。该窗口的大小将与在一小时内进入该窗口的事件的数量相关,或者仅将其与pojo / case类相关,因为会将事件累积到该对象中。(例如,如果将10000 msgs计数为整数,大小将接近10000 * msg大小还是int的大小?)此外,如果im使用pojos或case类,flink是否会为我处理状态(如果内存溢出到磁盘)在检查点用尽/保存状态等)还是我必须使用flink的状态对象?
谢谢你的帮助!
我正在尝试在 C* 中释放一些磁盘空间。
我删除了许多行,这些行创建了许多墓碑。
我正在运行 nodetoolgarbagecollect 并且想知道这个工具在幕后做什么。我读过它会删除墓碑隐藏的实际数据,但不会删除墓碑(将在 gc_grace_seconds 后清除)。那是准确的吗?垃圾收集工具与 gc_grace_seconds 参数没有任何关联?垃圾收集实际上是如何释放磁盘空间的?
关于此工具的工作原理和作用的文档并不多。
任何帮助都感激不尽
apache-flink ×2
java ×2
apache-kafka ×1
cassandra ×1
dropwizard ×1
either ×1
nodetool ×1
session ×1