我有一个玩具Flink工作,从3个Kafka主题中读取,然后联合所有这3个流。仅此而已,没有额外的工作。
如果在我的Flink工作中使用parallelism 1,只要我更改parallelism,一切都会很好
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
为什么它适用于并行1,但不适用于并行
是否与Kafka服务器端设置有关?或者它与我的java代码中的comsumer设置有关(我的代码中还没有特殊的配置)?
我知道这里提供的信息可能不够充分,但我无法接触Kafka集群。我只是希望一些大师可能会遇到同样的错误,并与我分享一些建议。
我用的是Kafka0.10,Flink1.5。
非常感谢。
正如您在错误日志中看到的,此错误来自Kafka集群。当Kafka代理的直接缓冲区内存超过分配给JVM的堆大小时,就会出现此问题。直接缓冲区内存是根据应用程序的需要从JVM堆中分配的。使用并行性时
export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"
但是在您无法访问Kafka代理的情况下(根据您的问题),您可以减少在一次调用中返回的记录数以轮询(),因此对代理中堆内存的需求将减少。(这不是一个标准的解决方案,我建议只是为了消除错误)。
从这个答案中:
Kafka消费者通过以下两个参数处理数据积压,
max.poll.interval.ms
当使用消费者组管理时,在调用投票()之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设定了上限。如果在此超时到期之前没有调用投票(),则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。默认值为300000。
max.poll.records
在一次调用投票()中返回的最大记录数。默认值为500。
忽略根据需求设置上述两个参数可能会导致轮询最大数据,而使用者可能无法处理可用资源,从而导致内存不足或有时无法提交使用者偏移量。因此,始终建议使用max.poll。记录和最大轮询。间隔ms参数。
因此,对于测试,请减小max.poll的值。记录到例如250,并检查是否会发生错误。
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
我已经红色了多篇文章和讨论,但我仍然有一些不确定性:我不确定是否应该使用或任何其他类型来存储预订-在“在线预订”的意义上(因此来自不同国家/时区的参与者需要在时间线上的同一时刻会面)。我倾向于使用,因为DB和Backend设置为UTC,并且由于传入的“创建预订”json消息包含ISO 8601(带偏移量)start DateTime 让我们采用以下设置:1.数据库(UTC、Oracle、MSSQL
问题内容: 我有这个Java代码段。我是Java的菜鸟。 错误: 码: 问题答案: 是的,这是问题所在: 在课程的最高级别,您只能拥有: 实例初始化程序块() 静态初始值设定块() 变量声明 构造函数声明 方法声明 嵌套类型声明 终结器声明 这些都不是。如果你 的意思 来声明一个变量,你应该这样做: 如果这 不是 您想要的,则应解释您的意图。 编辑:修复此问题后,此编译器错误似乎很明显: Conf
我的问题是关于cucumber特性文件的并行执行。在Selenium Java中,可以通过一个runner类并行运行多个cucumber特性文件吗? 我尝试过不同的方法,但都没有成功。
当我运行程序而不是找到骑士之旅时,我收到了一个StackOverflow错误。任何想法是什么导致了这一点,以及我如何改变我的代码,实际上找到骑士之旅,并摆脱这个错误。项目是为我的CS280课程,并在周五到期,请帮助。谢谢!!
一个能够让程序猿快速开发的炒鸡脚手架,核心技术Spring、JPA、Shiro。 基础环境 JDK1.8、Maven、Mysql、Redis、IntelliJ IDEA、minio、fastdfs 相关组件 - ok-admin - vue - iView - echarts - clipboard - cropperjs - lightbox - nprogress - webuploader
我已经在网站上看到了“解决方案”http://www.rgagnon.com/javadetails/java-0506.html,但它不能正常工作。昨天(六月八日)应该是159,但它说是245。 那么,有没有人用Java解决方案来获取当前日期的三位数朱利安日(不是朱利安日——我需要今年的日期)? 谢谢!马克