小编yaa*_*rix的帖子

如何修复:java.lang.OutOfMemoryError:flink kafka 消费者中的直接缓冲内存

我们正在 kubernetes 上运行一个 5 节点的 flink 集群(1.6.3),具有 5 个分区的 Kafka 主题源。5 个作业正在读取该主题(具有不同的消费者群体),每个作业的并行度 = 5。

每个任务管理器运行 10Gb 内存,任务管理器堆大小限制为 2Gb。摄取负载相当小(每秒 100-200 条消息),平均消息大小约为 4-8kb。所有作业都运行了几个小时。一段时间后,我们突然看到一项或多项工作失败:

ava.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:666)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
Run Code Online (Sandbox Code Playgroud)

flink 重新启动作业,但它一直在该异常上失败。我们已经尝试按照此处的建议减少记录轮询: Kafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memory 我们还尝试按照此处的建议增加 kafka 堆大小: Flink + Kafka, java.lang.OutOfMemoryError when parallelism > …

apache-kafka kafka-consumer-api apache-flink flink-streaming

7
推荐指数
1
解决办法
2394
查看次数

在Java中进行错误处理

我的新工作场所大量使用函数式Java Either进行错误处理(http://www.functionaljava.org/javadoc/4.5/functionaljava/fj/data/Either.html)。

几乎根本没有使用异常。

出于多种原因,这是非常令人讨厌的imo。举例说明:方法内的每个api调用(返回Either)必须首先检查返回的Either是否是错误,然后再继续执行下一行。如果是错误,则以Either形式将其传播回方法调用堆栈中。堆栈中的每个方法都还需要检查重新调整的Either中的错误,直到最终我们到达负责处理错误的方法为止。这导致结构非常糟糕的Java代码。我真的不能编写正常的流Java代码,因为我必须在每个api调用上“停止”(因此流不成问题)。例如

Either<Error, Value> myMethod(String someVar) {
     Either<Error, Value> valEither someService.getSomething(someVar)
     if (valEither.isRight()) {
        Either<Error, Value> otherValue someService.getSomethingElse(valEither.right().value())
        if (otherValue.isRight()) ..... //you get the idea
     } else {
        //maybe log
        return valEither;
    }
}
Run Code Online (Sandbox Code Playgroud)

我当然可以使用Either的单调方法(我也这样做),但这不能解决必须“询问”返回类型(如果有错误或值)的核心问题。为什么我认为将Either用作错误处理基础结构并不是一个好主意,原因有很多(长分配语句,长方法,嵌套泛型等)。

要解决此问题,我想到了可能会为每个正在返回“错误”的api调用抛出特定异常,然后在当前正在使用的顶级方法上捕获一次异常。

Value val = myService.getSomething().rightOrThrow(new MyException("blaa"));
Run Code Online (Sandbox Code Playgroud)

但这似乎是不可能的,因为在Either投影类型上唯一能做类似事情的方法会抛出Java错误,这并不意味着(几乎)在任何情况下都会被捕获(我可能会偶然捕获到stackoverflow或内存不足错误)。

myService.getSomething().right().valueE("some error msg");
Run Code Online (Sandbox Code Playgroud)

有什么建议或想法吗?

谢谢

java error-handling functional-java either

5
推荐指数
1
解决办法
980
查看次数

dropwizard 0.7 @会话注释

我正在尝试让Dropwizard与会议合作。我已经读过0.7,Dropwizard添加了会话支持。来自发行说明:“增加了对HTTP会话的支持。将带注释的参数添加到资源方法:@Session HttpSession会话中,以注入会话上下文。”

我有一个示例资源类,它从http get请求获取用户/密码,并且我想在会话中保存用户名:

@GET
public Response auth(@QueryParam("user") String user, @QueryParam("password") String password,         @Session HttpSession session) throws URISyntaxException {
    URI rootUri = getApplicationRootUri();
    if(!user.equals(config.getUser()) || !password.equals(config.getPassword())) {

        return Response.temporaryRedirect(rootUri).build();
    }
    session.setAttribute("user", user);
    return Response.temporaryRedirect( new URI("/../index.html")).build();
}
Run Code Online (Sandbox Code Playgroud)

尝试从我的应用程序登录时,我得到:

ERROR [2014-05-17 10:33:45,244] com.sun.jersey.spi.container.ContainerRequest: A message body reader for Java class javax.servlet.http.HttpSession, and Java type interface javax.servlet.http.HttpSession, and MIME media type application/octet-stream was not found.
Run Code Online (Sandbox Code Playgroud)

我尝试从github上的dropwizard源中查找一些测试,似乎@Session注释的使用方式与我的代码相同。

有任何想法吗?

谢谢!

java session dropwizard

3
推荐指数
1
解决办法
2179
查看次数

Flink窗口状态大小和状态管理

在阅读了flink的文档并四处搜寻之后,我无法完全理解flink的窗口中如何处理状态。可以说我有一个每小时运行的带有聚合函数的滚动窗口,该函数将msg累积到某些Java pojo或scala case类中。该窗口的大小将与在一小时内进入该窗口的事件的数量相关,或者仅将其与pojo / case类相关,因为会将事件累积到该对象中。(例如,如果将10000 msgs计数为整数,大小将接近10000 * msg大小还是int的大小?)此外,如果im使用pojos或case类,flink是否会为我处理状态(如果内存溢出到磁盘)在检查点用尽/保存状态等)还是我必须使用flink的状态对象?

谢谢你的帮助!

stream-processing apache-flink

3
推荐指数
1
解决办法
363
查看次数

nodetool垃圾收集实际上在做什么

我正在尝试在 C* 中释放一些磁盘空间。
我删除了许多行,这些行创建了许多墓碑。
我正在运行 nodetoolgarbagecollect 并且想知道这个工具在幕后做什么。我读过它会删除墓碑隐藏的实际数据,但不会删除墓碑(将在 gc_grace_seconds 后清除)。那是准确的吗?垃圾收集工具与 gc_grace_seconds 参数没有任何关联?垃圾收集实际上是如何释放磁盘空间的?

关于此工具的工作原理和作用的文档并不多。

任何帮助都感激不尽

cassandra nodetool

2
推荐指数
1
解决办法
1385
查看次数