当前位置: 首页 > 知识库问答 >
问题:

会话超时时Kafka侦听器回滚事务

燕琛
2023-03-14

我正在使用spring-kafka版本1.1.3来使用来自主题的消息。在使用者配置中,自动提交设置为true,而max.poll.records设置为10session.timeout.ms与服务器协商为10秒。

在收到消息后,我将它的一部分保存到数据库中。我的数据库有时会非常慢,这会导致kafka侦听器会话超时:

组MyGroup得自动偏移量提交失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员.这意味着对poll()的后续调用之间的时间比配置的session.timeout.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加会话超时,或者通过使用max.poll.records减小poll()中返回的批的最大大小来解决这一问题。

由于我无法增加服务器上的会话超时,并且max.poll.records已降至10,因此我希望能够将数据库调用包装在事务中,该事务将在kafka会话超时的情况下回滚。

这可能吗?我怎样才能做到这一点?

不幸的是,我在文件中找不到解决办法。

共有1个答案

洪祺
2023-03-14

您必须考虑升级到Spring Kafka1.2和Kafka0.10.x。旧的阿帕奇Kafaka有一个缺陷与心跳。因此,使用autocommit和slow listener,您最终会出现意外的重新平衡,这样的问题就由您自己解决了。你用的SpringKafaka的版本有这样一个逻辑:

// if the container is set to auto-commit, then execute in the
// same thread
// otherwise send to the buffering queue
if (this.autoCommit) {
    invokeListener(records);
}
else {
    if (sendToListener(records)) {
        if (this.assignedPartitions != null) {
            // avoid group management rebalance due to a slow
            // consumer
            this.consumer.pause(this.assignedPartitions);
            this.paused = true;
            this.unsent = records;
        }
    }
}

因此,您可以考虑关闭autocommit,并依赖默认打开的内置pause特性。

 类似资料:
  • 我对处理SpringAMQP监听器超时能力有一个要求,即我们从生产者那里发送一条消息,Spring AMQP的消费者监听器线程收到这条消息,但是说需要很多时间来执行自己并被挂起,这最终会导致监听器线程被呈现无法使用。 那么,有没有办法让Spring AMQP提供任何使用者超时设置,以便在给定超时时间后再次释放侦听器线程

  • 如果http请求超时,我试图中断当前线程。我已经为Kafka事务设置了PlatformTransactionManager作为bean。我在方法级别使用@Transactional注释。我们将发布三个主题的信息。在第一个主题中发布消息后,我将添加线程。睡眠(5000),如果执行时间超过6秒,当前线程将从筛选器中断。所以这里的通话被打断了,但信息被发布给了Kafka。我们只是在传达信息。我们不消费任

  • 在 HTTP 协议中,当客户端不再处于活动状态时没有显示的终止信号。这意味着当客户端不再处于活跃状态时可以使用的唯一机制是超时时间。 Servlet 容器定义了默认的会话超时时间,且可以通过 HttpSession 接口的 getMaxInactiveInterval 方法获取。开发人员可以使用HttpSession 接口的 setMaxInactiveInterval 方法改变超时时间。这些方法

  • 问题内容: 我在用户登录时创建会话,如下所示: 如何在X分钟的会话上指定超时,然后在达到X分钟后让它执行功能或页面重定向? 编辑:我忘了提及由于不活动,我需要会话超时。 问题答案: 首先,存储用户最后一次发出请求的时间 在后续请求中,检查他们多久之前提出了请求(在此示例中为10分钟)

  • 我有一个关于正确配置kafka侦听器属性的问题-侦听器和advertised.listers。 在我的配置中,我设置了以下道具: 客户端使用 进行连接。我是否需要在侦听器和广告侦听器中具有相同的值。这里 是指向运行 kafka 代理的主机的 dns 记录。 在什么情况下,我希望它们保持不变和不同? 谢谢!

  • 问题说明: