当前位置: 首页 > 知识库问答 >
问题:

Kafka consumer分区获取最大字节的奇怪行为

能旭
2023-03-14

我有一个主题a,有12个分区。我在一个集群中有3个Kafka经纪人。对于主题A,每个代理有4个分区。我没有创建任何副本,因为我不关心恢复能力。

我有一个简单的Java消费者使用kafka客户端库。我在属性中提到了以下内容

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-serverA:9092,kafka-serverB:9092,kafka-serverC:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty("max.partition.fetch.bytes", "100000");

消费者记录和打印记录的代码更多,工作正常。我在主题中有12条消息,我通过“kafka-run-class.shkafka.admin.消费者组命令”验证了每个分区中都有一条消息。消息大小为100000字节,正好等于max.partition.fetch.bytes限制。

当我轮询时,我应该会看到12条消息作为响应返回。然而,响应非常不稳定。有时我看到来自4个分区的消息,表明只有一个代理正在响应消费者请求,或者有时我看到8个。我从未得到所有12个分区的响应。只是为了测试,我删除了max.partition.fetch.bytes属性。我观察到了同样的行为。

我遗漏了什么吗?引导配置中的serve1、server2、server3似乎在服务请求时没有选择所有3个代理。

非常感谢您的帮助。我在不同的机器上运行经纪人和消费者,他们的规模足够大。

共有1个答案

叶茂
2023-03-14

我遗漏了什么吗?引导配置中的serve1、server2、server3似乎在服务请求时没有选择所有3个代理。

在您的Kafkabootstrap.servers属性中,您列出了所有好的代理。其中一个代理将被选中以获取元数据,这基本上是有关主题有多少分区以及哪个代理是这些分区的领导者的信息。

无论选择哪个服务器,都应该提供有关另一个服务器的信息。

检查您的所有代理是否相互认识,即他们属于同一个Kafka群集,即他们指向同一个zookeeper实例。

您提到的所有代理IP必须可供您的消费者访问。因此,请确保已设置适当的播发。侦听器属性。

例如,如果播发。侦听器=纯文本://1.2.3.4:9092然后消费者必须可以访问1.2.3.4:9092

此外,默认情况下,Kafka消息会在一段时间后定期自动提交,如果某个消费者使用特定的组ID读取消息,则不会再次使用这些消息,因为它们已提交。因此,您也可以尝试更改组。id属性并重新检查。

此外,请检查是否正在使用相同的组id运行多个使用者,在这种情况下,一些分区将分配给一个使用者,另一些分配给另一个使用者。

您可以通过使用kafka console consumer(Kafka控制台消费者)和给出主题的从头标记来解决此问题,并查看是否所有消息都已被使用。

您可能还需要检查default.api.timeout.ms参数并尝试增加值,以防出现任何网络拥塞导致客户端从一个引导服务器切换到另一个。

 类似资料:
  • 问题内容: 我有一个小文件,其中包含一些我想用“ |”分割的内容 字符。 当我尝试使用其他任何字符(例如“>”)时,它都可以正常工作,但是使用“ |” 性格,有一些意想不到的结果。 行本身(此处带有 >字符) addere> to add>(1) 分割“ >”结果 [加法,加法(1)] 分割“ |” 结果 [,a,d,d,e,r,e,|,t,o,,a,d,d,|,(,1,)] 为什么要拆分所有内容

  • 我有以下代码: 假设我现在将电脑的时区设置为太平洋时间(PDT为UTC-7),则打印 2012年6月29日星期五08:15:00太平洋标准时间 PDT不是比IST(印度标准时间)晚12.5小时吗?这个问题在任何其他时区都不会发生-我尝试了UTC、PKT、MMT等,而不是日期字符串中的IST。Java中有两个IST吗? 注意:实际代码中的日期字符串来自外部源,因此我不能使用GMT偏移量或任何其他时区

  • 问题内容: 我看到了我认为是错误的行为。@InjectMocks似乎并没有在每种测试方法之前创建一个新的测试主题。就像@Mock一样。在下面的示例中,如果Subject.section是最后一个,则@Test失败。如果不是最后两个都通过。我当前的解决方法是使用@BeforeClass,但这并不理想。 Subject.java: Section.java: SubjectTest.java 干杯。

  • 我调用下面的方法来计算一些值。我提供agencyID和月份的整数表示来执行计算。 等等...直到我到达12月(值12)。然而,当我到达5月(月值5)时,代码给出了一个运行时错误。然而,奇怪的是这个错误被抛出 在我的查询中,我没有在查询中的任何位置包含shift\u dayId,如下所示: 我恳请帮助我确定为什么givenMonth在值大于4时抛出运行时异常。 谢谢你 编辑 这是我要求的ShiftD

  • 问题内容: 令人难以置信。为什么输出-124? 问题答案: 在Java中,an 是32位。A 是8 。 最原始的类型Java中的签名,,,,和long被编码在二进制补码。(类型为unsigned,并且sign的概念不适用于。) 在此数字方案中,最高有效位指定数字的符号。如果需要更多位,则将最高有效位(“ MSB”)简单复制到新的MSB中。 因此,如果你具有 并将其表示为 32位,则只需将1复制到左

  • 我有以下代码来解析一个JSON文件: 要处理以下JSON文件: 如果我执行此代码,我将收到以下错误: 所以我开始一步一步地调试应用程序,看看part processing()中的哪个代码部分抛出了这个异常。令人惊讶的是,那里的所有代码都正常执行:没有抛出异常,也没有返回结果I except。 更让我惊讶的是,当我稍微改变第一种方法的代码时,它可以在不产生异常的情况下工作。 我不知道println方