当前位置: 首页 > 知识库问答 >
问题:

《SpringKafka》中的延迟ACK

干亮
2023-03-14

我将Spring和Spring Kafka用于一个批处理服务,该服务从Kafka收集数据,直到满足某些条件,然后转储数据。

我想在数据离开我的服务时确认提交,但它可能会在内存中停留5-10分钟。

是否有任何其他方法来确认/提交来自Spring Kafka的偏移量,只给出分区/偏移量信息?

共有1个答案

柯瀚玥
2023-03-14

您可以将ackmode.timeackmode.count与非常大的acktimeackcount一起使用,这样容器将永远不会执行ACK。

然后,将使用者<?,?>传递到您的侦听器方法中,并自己执行偏移量提交。

但是,请注意,使用者不是线程安全的,因此必须在侦听器线程上执行提交。

此外,您可能需要将max.poll.interval.ms增加到默认值(5分钟)以外,以避免分区的重新平衡。

 类似资料:
  • 问题内容: 我需要在循环中对数据库进行SQL查询: 更好的方法是:保持原样或循环后移动: 或者是其他东西 ? 问题答案: 整个要点是直到函数返回才执行,因此将其放置在要关闭的资源打开后的适当位置。但是,由于要在循环内创建资源,因此根本不要使用defer- 否则,在函数退出之前,您不会关闭在循环内创建的任何资源,因此它们会堆积直到然后。相反,您应该在每次循环迭代结束时关闭它们, 而无需 :

  • 问题内容: 我正在尝试使用新的React Lazy和Suspense创建后备加载组件。这很好用,但后备时间仅显示几毫秒。有没有办法增加额外的延迟或最短时间,因此我可以在渲染下一个组件之前显示该组件的动画? 现在懒导入 等待组件: 我可以做这样的事情吗? 问题答案: 函数应该返回对象的承诺,该对象由具有默认导出功能的模块返回。不会返回承诺,也不能那样使用。尽管任意承诺可以: 如果目标是提供 最小的

  • 什么叫中断延迟 所谓中断延迟,其实在编写裸机程序中经常被用到,我们时常需要靠中断来捕获相关的数据或者事件输入,然后在中断中置起相关标志位,在主函数中的死循环中检查这个标志位,如果置起,说明中断发生了,然后进行相应的处理。与之类似的,freeRTOS的中断延迟是一个道理,中断服务函数不做相关的处理,而是由随后运行的任务来处理,这相比直接在中断服务函数中处理会有一个延迟,好处是不会占用太多的中断时间,

  • 问题内容: 我正在编写一个小的脚本,该脚本在页面加载时将CSS子类分配给三个元素。800ms之后,我希望它删除该子类。 我认为这段代码可以做到: 遗憾的是,任何帮助将不胜感激。提前致谢。 问题答案: 您可以使用 功能: 在指定的延迟后调用函数或执行代码段。

  • 我在一个公认的缓慢配置中设置了Kafka——但我不期待我看到的数字。 我将集群设置为<code>LogAppendTime</code>,因此我正在测量事件写入Kafka(由代理决定)与服务接收到事件之间的时间。代理和应用程序都位于“同一位置”,因此服务器之间的ping时间很短,时钟应该同步或接近。 我看到延迟在 到 600ms 之间,很多是 ......巨大的差异让我觉得我的设置出了问题。它因消

  • 我在一个由三台机器组成的集群上使用cassandra 2.1.12,每台机器都有32 GB的RAM和4个内核(在Amazon AWS上) 我使用的是cassandra的所有默认配置。 我用它来进行我的网站事件分析(时间序列数据),每天的数据约为1 GB,复制因子为3。 我的数据在每台机器上已经增长到85 GB左右,现在它的读取延迟约为 我的行很少更新,所以,我没有使用Levelorder Comp