当前位置: 首页 > 知识库问答 >
问题:

为什么我所有的Kafkapromise不是真的promise?(0.9.0.1)

万德海
2023-03-14
Reader 1 -> Partition 0 & 1
Reader 2 -> Partition 2 & 3

我从分区中读取,并对我读取的每个ConsumerRecord执行consumer.commitasync(在理解行为之前,在这一点上故意不对提交进行批处理)。

我在commit async回调中放置了一个每个主题的计数器来度量它被调用的次数,总数为100万次。

在应用程序稳定下来并停止之后,我使用Kafka CLI工具查看我的偏移量,我得到如下内容:

Group           Topic                          Pid Offset          logSize         Lag             Owner
group1          lowercaseStrings               0   233788          250000          16212           none
group1          lowercaseStrings               1   249999          250000          1               none
group1          lowercaseStrings               2   249999          250000          1               none
group1          lowercaseStrings               3   233788          250000          16212           none

为什么我还会有这种滞后?什么可能导致这一切?

共有1个答案

酆英达
2023-03-14

Kafka批量获取消息。当您在消息流中调用next()时,可能会发生两件事:-要么本地没有数据,用户将在内部调用poll(),这将更新其偏移量状态-要么有一些数据,您将在本地批处理中前进。

当您调用commitAsync()时,您提交了在上次poll()调用中获得的偏移量,而不是当前通过内部迭代有效地执行的偏移量。

您可以通过减小接受的批处理的大小(在配置中设置batch.size)来控制这种行为,如果您真的想对每个消息进行poll(),则将其降至0。我希望将此配置降至0,所有地方都将达到0滞后(注意,对于可伸缩的使用,这将破坏吞吐量)。

 类似资料:
  • 我的代码: 当我尝试运行这样的东西时: 我得到了: 但为什么? 我的主要目标是将返回承诺的中的token转换为一个变量。然后才预形成一些动作。

  • 下面的代码将输出“true”,这意味着Array()为true。在Python中,list()为False,这是否只是因为语言设计者的偏好?

  • 问题内容: 我的代码: 当我尝试运行这样的东西时: 我越来越: 但为什么? 我的主要目标是将令牌(从令牌中返回承诺)转换为变量。然后才执行一些操作。 问题答案: 只要其结果尚未解决,promise将始终记录未决。无论promise状态如何(已解决或仍处于待处理状态),您都必须调用promise来捕获结果: 这是为什么? 承诺只是向前的方向;您只能解决一次。a的解析值传递给其或方法。 根据Promi

  • 我想运行多个shell进程,但当我尝试运行63个以上时,它们会挂起。当我将线程池中的最大线程数减少到n时,它会在运行shell命令后挂起。 正如您在下面的代码中所看到的,问题不在于启动块本身,而在于包含shell命令的启动块: 正在运行<代码>/process\u items foo bar baz给出以下输出,在处理条形图之后挂起,该输出正好位于使用shell运行的th(此处为2)线程之后: 我

  • 我正试图按照本教程学习pygame。我正试图繁殖敌人的物体,但我认为我犯了一个错误,因为没有显示任何东西。 据我所知,我必须创建一个敌人类,然后添加一个事件来添加敌人(ADDENEMY)。当该事件被调用时(由于计时器的作用,每250毫秒一次),它应该会显示一个向左移动的敌人,直到它到达显示的末尾。 我有一组敌人精灵和一组所有精灵,我使用screen.blit和for循环将所有精灵中的每个实体渲染到

  • 本文向大家介绍什么是Promise?相关面试题,主要包含被问及什么是Promise?时的应答技巧和注意事项,需要的朋友参考一下 Promise可以帮助我们更好地处理异步操作。下面的实例中,100ms后会打印result字符串。catch用于错误处理。多个Promise可以链接起来。