当前位置: 首页 > 知识库问答 >
问题:

区分如何在异步Kafka producer中处理异常

白宏放
2023-03-14

当生成消息到Kafka时,您可以得到两种错误:可检索和不可检索。在处理它们时,你应该如何区分它们?

我希望异步生成记录,将回调对象接收到不可重试异常的记录保存在另一个主题(或HBase)中,并让生产者为我处理所有接收到可重试异常的记录(最多尝试次数,当它最终到达时,会成为第一批异常之一)。

我的问题是:尽管有回调对象,但生产者是否仍会自行处理可检索的异常?因为在接口回调中说:

可重试异常(暂时的,可通过增加#.重试次数来覆盖)

代码可能是这样的吗?

producer.send(record, callback)

def callback: Callback = new Callback {
    override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
      if(null != e) {
         if (e == RecordTooLargeException || e == UnknownServerException || ..) {
            log.error("Winter is comming")
            writeDiscardRecordsToSomewhereElse
         } else {
            log.warn("It's no that cold") //it's retriable. The producer will keep trying by itself?
         }
      } else {
        log.debug("It's summer. Everything is fine")
      }
    }
}

Kafka版本:0.10.0

任何光将被赞赏!:)

共有1个答案

范鸿
2023-03-14

正如《Kafka圣经》(又名《Kafka权威指南》)所说:

缺点是,虽然commitSync()将重试提交,直到提交成功或遇到不可重试的失败,但commitSync()不会重试。

原因是:

“不重试”是指,当commitAsync()从服务器接收到响应时,可能已经有一个已成功的后续提交。

假设我们发送了一个提交偏移量2000的请求。存在临时通信问题,因此代理永远不会收到请求,因此永远不会响应。同时,我们处理了另一个批次并成功提交了偏移量3000。如果委员会同步()现在重试以前失败的提交,它可能会在已经处理和提交了偏移量3000之后成功提交偏移量2000。在再平衡的情况下,这将导致更多的重复。

除此之外,您仍然可以创建一个递增的序列号,每次提交时都可以增加该序列号,并将该序列号添加到回调对象中。当重试时间到来时,只需检查Acc的当前值是否等于您给回调的数字。如果是这样,它是安全的,您可以执行提交。否则,将出现新的提交,您不应重试此偏移量的提交。

这似乎有很多麻烦,这是因为如果你正在考虑这个问题,你应该改变你的策略。

 类似资料:
  • 如果这被认为是一个可接受的实践,我需要什么-如果有-错误处理?我的理解是,task.wait()将重新抛出异步操作抛出的任何异常,并且我没有提供任何取消异步操作的机制。仅仅调用task.wait()就足够了吗?

  • 在Servlet 3.0中,引入了异步处理的概念。所以所有的书都说这消除了每个请求一个线程的要求。我已经测试过了,是的,它确实有效。现在,我有一个简单的servlet,用户在其中以同步模式启动HTTP请求。线程只需Hibernate1秒,然后回复客户端。当我对这种模式进行负载测试时,服务器每秒只能处理4个请求。现在,我将同步模式更改为异步模式,并根据请求创建一个新线程,将原始http线程释放回池。

  • 我目前正在尝试为spring boot实现一个自定义的错误处理程序,我已经用以下方法实现了它: 不知为什么这不起作用,并且异常仍然被抛给客户端,是否有某种方法捕获方法抛出的异常并忽略它。

  • 问题内容: 什么是处理这种情况的最佳方法。我处于受控环境中,所以我不想崩溃。 从setTimeout内抛出时,我们将始终获得: 如果抛出发生在setTimeout之前,那么bluebirds catch将捕获它: 结果是: 很棒-但是如何在节点或浏览器中处理这种性质的恶意异步回调。 问题答案: 承诺不是域,它们不会捕获异步回调中的异常。你就是做不到。 然而诺言来捕捉从内抛出的异常/ / 构造函数的

  • 我有三项任务。任务1、任务2和任务3。task1和task2是异步任务,即它们同时执行任务,返回完成结果的时间不可预测。最初,我希望task1和task2同时执行,并在得到结果后执行task3。 我设计的结构如上所述,但我总是得到不好的结果,比如如果task2是一个url发布请求,使用afnet,我不能得到成功块参与。 任何帮助我的人我都会感激的。