我们正在kubernetes上运行一个5节点flink集群(1.6.3),具有5个分区Kafka主题源。从该主题读取5个作业(具有不同的消费组),每个作业的并行度=5。
每个任务管理器都使用10Gb的ram运行,任务管理器堆大小限制为2Gb。摄取负载相当小(每秒100-200 msgs),平均消息大小约为4-8kb。所有作业都可以正常运行几个小时。经过一段时间,我们突然看到一个或多个作业失败:
ava.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:666)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
flink重新启动作业,但在该异常情况下仍会失败。我们已经尝试减少这里建议的民意调查记录:Kafka消费者扔java。lang.OutOfMemoryError:直接缓冲区内存我们还尝试增加kafka堆的大小,建议如下:Flink kafka,java。并行时lang.OutOfMemoryError
失败的原因可能是什么?我们还应该检查什么?
谢谢
您可能低估了一件事,即并行度为5意味着有5 4 3 2 1=18对组合。如果我们将其与链接线程进行比较,可能有3 2 1=6种组合。
在链接线程中,通过将max poll records设置为250来解决问题,因此我的第一个想法是在这里将其设置为80(甚至设置为10),看看这是否解决了问题。
(我不确定需求是否是这样形成的,但唯一值得注意的区别是从3到5的平行度,因此这似乎是一个很好的补偿点)。
问题内容: 我有一个内存泄漏,我已经将其隔离到错误配置的直接字节缓冲区。 GC收集包含这些缓冲区但不处理缓冲区本身的对象。如果实例化包含缓冲区的瞬态对象足够多,则会得到以下令人鼓舞的消息: 我一直在寻找这个问题,显然 和 不工作。 问题答案: 我怀疑您的应用程序某处有对ByteBuffer实例的引用,这阻止了它被垃圾回收。 直接ByteBuffer的缓冲内存是在普通堆之外分配的(以便GC不会移
问题内容: 在编写用于OpenGL库的Matrix类时,我遇到了一个问题,即使用Java数组还是使用Buffer策略存储数据(JOGL为Matrix操作提供直接缓冲区复制)。为了对此进行分析,我编写了一个小型性能测试程序,该程序比较了Arrays vs Buffers和Direct Buffers上循环和批量操作的相对速度。 我想在这里与您分享我的结果(因为我发现它们很有趣)。请随时发表评论和/或
在多生产者设置中,有一个生产者线程和一个消费者线程。消费者可以将新事件发布回同一个环形缓冲区吗?我假设它在缓冲区已满时中断,并且消费者线程在处理当前事件时永远不会获得空闲插槽。换句话说,死锁会发生。 最好的方法是什么?我是否必须引入一种代理线程,它接收来自消费者的事件,并像普通生产者一样将它们发布到环形缓冲区? 补充-为什么有用?假设消费者线程正在处理股市数据事件,它需要向市场模拟器(一个类)发送
问题内容: 由于它不在jvm heap&gc中,何时发布?还是一直保留到流程终止? 但是所有答案都是模糊的,没有一个明确的答案,是否有明确的答案?至少适用于 64位Linux 上的 Java 8 。 __ 问题答案: 不使用旧的Java终结器。相反,它使用内部API。它创建一个新线程并存储到每个创建的线程中(除了重复和切片指的是主缓冲区)。当变成 幻影可到达的 (也就是说,不再存在对字节缓冲区的强
发布者创建reply_to队列并发布到路由密钥,其中包含一条消息,告诉消费者向队列发送响应(RPC协议),以及一个传回的相关id,以便所有未来的结果都与该唯一标识符相关联 Exchange向绑定到该路由密钥的所有队列发送消息。这里,有两个消费者的两个队列,每个都绑定到路由密钥“泵” 一段时间后,消费者回复回队列,然后确认消息,以便他们的唯一队列删除发送到其队列的消息。每个收到消息的消费者都会这样做
谁能请解释和指导我链接或资源阅读关于Kafka消费者如何在下面的场景下工作。 > 一个有5个消费者的消费者组和3个分区的主题(Kafka是如何决定的) 一个消费者组有5个消费者,主题有10个分区(kafka如何分担负载) 两个消费者组和两个服务器的kafka集群,其中一个主题被划分在节点1和节点2之间,当来自不同组的消费者订阅到一个分区时,如何避免重复。 上面可能不是配置kafka时的最佳实践,但