当前位置: 首页 > 知识库问答 >
问题:

我如何知道我的记录是否已使用Spring Kafka手动提交

狄宜然
2023-03-14

我想知道在spring Kafka中,当ackmode设置为manual时,commit是如何工作的。

下面是我在kafkaconfigContainerProperties.SetackMode(AbstractMessageListenerContainer.ackMode.Manual)中设置的属性;

侦听器代码

@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
    public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
        countDownLatch.countDown();     
        acknowledgment.acknowledge();
}

我正在按照spring kafka文档执行确认操作,但这只意味着我的消息被标记为已发送但未消费(这是我的理解)。

>

  • 在这种情况下,我是否应该调用commitsync()方法。如果是,我从哪里调用它,因为我需要获得对kafkaconsumer的引用。如果没有,它是如何在内部工作的,我可以跟踪它吗?

    是否有commitid或某个返回的值?我的想法是知道一个特定的消费者记录是否被消费。我想存储该值用于内部跟踪。

    这将真正帮助我区分有多少记录被消耗,有多少记录被挂起以及它们的状态。

  • 共有1个答案

    盛城
    2023-03-14

    我可以回答第一个问题。一切Rest对阿帕奇Kafka来说就像是一个故事。

    由于我们不能从我们想要的地方执行commit,而只能从执行consumer.poll()的同一线程执行,因此我们将所有提交请求存储在内部KafkaMessageListenerContainer队列中,并在执行this.consumer.poll()之前在主使用者循环中查看该请求。

    即使您使用manual_immediate,实际的consumer.commitsync()也是在与您的acknowledgment.acknowledg()不同的线程上执行的。

    consumer中的API如下所示:

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
    

    因此,没有任何commitid钩子要处理。

    我认为在Apache Kafka中没有类似于not committed之类的概念。这些数据始终存在于主题日志中,并且直到特定的管理操作或压缩配置才会被删除。

    我认为提交偏移量特性与消费者组的目的和根据JavaDocs我们拥有的:

    * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
    * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
    * should not be used. The committed offset should be the next message your application will consume,
    * i.e. lastProcessedMessageOffset + 1.
    

    因此,当您的使用者死亡时,它将从其组的上次提交的偏移量重新启动。不同的组可以读取相同的数据,但从某个其他偏移。我想这绝对是为什么他们的API没有提供任何到实际状态的挂钩。根本就没有这样一个!

     类似资料:
    • 问题内容: 我有一个表格,基本上是上传一个文件。我要提交两次表格,第1次不包含多部分,第二次1次包含多部分。 但是我想先检查一下第一次提交表单是否成功,然后再进行第二次提交 引用@Vern后编辑 这是我的servlet部分。我在哪里确定它是否由多个部分组成。如果未将 resultType 存储到会话变量中,则返回, 现在,我要检查此“已 提交 ”或类似内容,然后第二次提交表格。 第二表单提交:在这

    • 我想知道记录是否更新了熊猫数据框中的日期。数据框由几列组成,其中对于A的每个值,我们有几个B的值,包括开始日期和结束日期。由于时间戳,我们可以知道是否有新的记录或以前的记录已被修改。 我想知道的是如何能够检查如果一个新记录有一个日期范围接近其他记录在其组中例如B1组如果他们有一个类似的日期范围删除前一个只留下新记录已更新,但如果它没有公共范围来解释为新记录。 例如 输入数据帧: 预期输出: 谢谢你

    • (我已经删除了上一个,所以我想你可以在编辑中找到它) 编辑 所以按照bad_coder在评论中告诉的以及他分享的链接中的大多数答案,我点击了“选项显示所有”,我得到了这个 1) 这里唯一有意义的是点击“”按钮,所以我点击了它 2) 现在我应该用路径替换突出显示的部分(换句话说,我在cmd中键入“where python”后得到的路径) 3) 这就是我发现的。那么我应该在突出显示的部分复制并粘贴哪一

    • 问题内容: 每次我运行使用Flask-SQLAlchemy的应用程序时,都会收到以下警告,提示该SQLALCHEMY_TRACK_MODIFICATIONS选项将被禁用。 我试图找出此选项的作用,但是Flask-SQLAlchemy文档尚不清楚该跟踪的用途。 · 如果设置为True(默认值),Flask-SQLAlchemy将跟踪对象的修改并发出信号。这需要额外的内存,如果不需要,可以将其禁用。

    • 问题内容: 我想知道我是否在通话。 如果我正在通话,请启动服务(服务部分已清除)。我该怎么做呢? 参加通话时,我需要致电服务中心…我不知道该怎么做?有什么帮助吗? 问题答案: 您需要广播接收器… 在清单中声明广播接收器… 还声明使用权限… 广播接收器类… 还有一类可自定义电话状态侦听器…

    • 问题内容: 我正在使用拦截器在基于Struts的应用程序中实现一些功能,而对其生命周期的工作方式却感到困惑。根据Struts的文档(“拦截器”,“写拦截器”和“大图”),它应该像这样工作: 这是有道理的,但是我在如何区分在操作之前执行的拦截器调用与在结果呈现之后执行的拦截器调用之间进行了尝试(我在这里跳过了s)。 如果启动调试器,则会有两个调用, 并且在通过时找不到任何明显的内容 。 ( 更新 :