当前位置: 首页 > 知识库问答 >
问题:

模拟Kafka CommitFailedException

赵经国
2023-03-14

我试图模拟Kafka抛出的CommitFailedException。

我手动将“session.timeout.ms”设置为10000毫秒,将“enable.auto.commit”设置为false。

kafkaconsumer.poll()之后,我有一个语句thread.sleep(12000),然后我执行提交。我预计,由于线程在下一轮轮询之前的时间为12s,所以使用者应该被标记为已死,并且应该抛出一个CommitFailedException。然而,该过程执行得很顺利。

如何模拟KafkaConsumer引发的异常。

consumer.subscribe(Arrays.asList("foo"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }

            try {
                Thread.sleep(12000);
            }catch (Exception e){
                e.printStackTrace();
            }
            consumer.commitSync();
        }

共有1个答案

井嘉胜
2023-03-14

Kafka通过一个单独的线程使用心跳机制来检查消费者的健康状况。使用者心跳线程必须在session.timeout.ms时间到期之前向代理发送心跳。

heartbeat.interval.ms:在使用Kafka的组管理设施时,向消费者协调器发送心跳的预期间隔时间。heartbeat用于确保消费者的会话保持活动,并在新消费者加入或离开组时促进重新平衡。

session.timeout.ms:当使用Kafka的组管理工具时,用于检测客户端故障的超时。客户端向代理发送周期性心跳以指示其活跃度。如果在此会话超时到期之前,代理没有接收到心跳,则代理将从组中删除此客户端并启动重新平衡。

max.poll.interval.ms:使用使用者组管理时,调用poll()之间的最大延迟。这为使用者在获取更多记录之前可以空闲的时间量设置了一个上限。如果在超时到期之前未调用poll(),则认为使用者失败,组将重新平衡以便将分区重新分配给另一个成员。

如果使用者由于session.timeout.ms中没有心跳或max.poll.interval.ms中没有轮询而被Kafka认为已死亡,使用者将无法提交消息并获得CommitFailedException

CommitFailedException:当使用KafKaConsumer.CommitSync()提交的偏移量失败并出现不可恢复的错误时,会引发此异常。当组重新平衡在成功应用提交之前完成时,可能会发生这种情况。在这种情况下,通常不能重试提交,因为某些分区可能已经分配给组中的另一个成员。

结果;因为heartbeat线程是一个单独的线程,所以代码中的睡眠不会影响它。但在您的情况下,可以将max.poll.interval.ms设置为10秒,以获得CommitFailedException

 类似资料:
  • 问题内容: 我正在开发一个Web应用程序,该应用程序显然在iOS设备中存在问题。问题是我不拥有iOS设备,而是在Linux Ubuntu中进行开发。我正在寻找一种在Linux(尤其是浏览器)中仿真/模拟此OS的方法,但是还没有找到任何东西。 到目前为止,我发现的是iOS SDK的Simulator,但这是针对Mac的。还有一些Windows模拟器。有人做过吗? 问题答案: 我能想到的唯一解决方案是

  • 上面还有第二个问题。当我在Expects块中定义mock类时(如上),似乎只调用了构造函数,而不是,因此没有正确初始化对象。我通过将它移到方法中并在那里实例化该类来解决这个问题。看起来是这样的: 因此,这似乎得到了要调用的正确构造函数,但似乎还在调用。有什么见解吗?

  • 问题内容: 对于单元测试,我需要模拟几个依赖项。依赖项之一是实现接口的类: 我需要设置一个此类的模拟对象,当提供一些指定参数时,该对象将返回一些指定值。 现在,我不确定的是,模拟接口或类是否更好 与 在测试方面有什么不同吗?首选的方法是什么? 问题答案: 在您的情况下,可能不会有太大的区别,但是首选的方法是模拟接口,就像通常情况下,如果您遵循TDD(测试驱动开发),那么即使在编写实现类之前,也可以

  • 问题内容: 我在Python中使用时遇到了一些困难: 测试实际上返回正确的值,但它是Mock对象,不是。您如何在Python库中模拟属性? 问题答案: 您需要使用和: 这意味着:调用时,在该调用的返回值上,为属性设置a以返回value 。

  • 问题内容: 任何人都可以对如何最好地使用EasyMock进行呼叫提出任何建议吗? 我可以将调用移到另一个实现接口的类中的方法中,而不是在理想环境中。 我想知道是否还有其他建议? 问题答案: 你的班级不应该打电话。它应该期望将a 设置为其依赖项,并对其进行处理。然后,在测试中,您可以轻松提供一个模拟并将其设置为依赖项。 作为旁注,我建议您观看有关面向对象设计的本课程以提高可测试性。 更新: 我没有看

  • 我使用JDK1.6.0_24,当我尝试用Mockito模拟HTTP会话时,我出现了下一个错误:

  • 9.1 模拟 现实中有很多问题,如果不利用计算机的话,就很难解决甚至不可能解决。例如天气预 报,古人只能通过肉眼看天来做预测,现代人则通过为大气过程建立数学模型并进行数值计 算来做预测,最新的理论更将确定性模型发展到不确定性模型,从而能对大气这个混沌系统 的行为做出更准确预报。这一切都有赖于计算机模拟(simulation)技术的应用,即利用计 算机为现实问题甚至假想问题建立模型,通过改变一些变量

  • 问题内容: 我该如何与正在测试的班级中的Mockito其他班级进行模拟? 例如: MyClass.java 任何东西PerformerClass.java 并测试: 我是否可以欺骗以排除不必要的逻辑?我可以重写方法来简单地返回还是? 为什么指定Mockito,因为在Robolectric进行Android测试时需要它。 问题答案: 您可以重构,以便它使用依赖注入。不用让它创建实例,您可以将类的实例