当前位置: 首页 > 知识库问答 >
问题:

在Spring-Kafka中,哪一个方法做相同的事情,比如onPartitionsRevoked?

葛嘉悦
2023-03-14

我知道在斯平-Kafka中我们有以下方法:

无效registerSeekCallback(ConsumerSeekCallback回调);

void onIdleContainer(映射分配、ConsumerSeekCallback回调);

但哪一个与PartitionsRevoked上的本机ConsumerRebalanceListener方法相同?

“将在重新平衡操作开始之前和使用者停止提取数据之后调用此方法。建议在此回调中将偏移量提交到Kafka或自定义偏移量存储区,以防止重复数据。”

==========更新=======

嗨,加里,当我将RebalanceListener添加到ContainerProperties中时。我可以看到两种方法都被触发了。但是,我得到了一个例外,说“提交无法完成,因为组已经重新平衡并将分区分配给了另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多的时间处理消息”,你知道吗?

============更新2==============

    public ConcurrentMessageListenerContainer<Integer, String> createContainer(
      ContainerProperties containerProps, IKafkaConsumer iKafkaConsumer) {

    Map<String, Object> props = consumerProps();

    DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);

    **RebalanceListner rebalanceListner = new RebalanceListner(cf.createConsumer());**

    CustomKafkaMessageListener ckml = new CustomKafkaMessageListener(iKafkaConsumer, rebalanceListner);

    CustomRecordFilter cff = new CustomRecordFilter();

    FilteringAcknowledgingMessageListenerAdapter faml = new FilteringAcknowledgingMessageListenerAdapter(ckml, cff, true);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(5);

    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(1500); // 1.5 seconds

    RetryTemplate rt = new RetryTemplate();
    rt.setBackOffPolicy(backOffPolicy);
    rt.setRetryPolicy(retryPolicy);
    rt.registerListener(ckml);
    RetryingAcknowledgingMessageListenerAdapter rml = new RetryingAcknowledgingMessageListenerAdapter(faml, rt);

    containerProps.setConsumerRebalanceListener(rebalanceListner);
    containerProps.setMessageListener(rml);
    containerProps.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    containerProps.setErrorHandler(ckml);
    containerProps.setAckOnError(false);
    ConcurrentMessageListenerContainer<Integer, String> container = new ConcurrentMessageListenerContainer<>(
        cf, containerProps);

    container.setConcurrency(1);
    return container;
  }

共有1个答案

姬和豫
2023-03-14

您可以将ReBalanceListener添加到传递给构造函数的容器的ContainerProperties中。

 类似资料:
  • 问题内容: 我的想法是,人们使用Docker来确保本地环境与生产环境相同,并且我可以停止考虑他们的应用程序在物理上运行的位置,并且平衡机制应该暂时将应用程序分配在最佳位置。 我是100%基于Web的人,我将与我们的数据库一起迁移到云中,并且无法移动的内容将无缝地桥接在一起,因此企业内容和云将成为一个子网。 所以我想知道,也许Service Fabric已经完成了与Docker相同的工作,并且它提供

  • 我想要将kafka事务与存储库事务同步: 如果我能获得一个简单的kafka事务与存储库事务同步的示例和一个解释,我会真正帮助我。

  • 我想用相同的方法在一个事务中插入/更新两个表中的记录。 首先插入表1,如果成功,则更新表2,然后如果两者都成功,则只提交,否则不提交(在两个表中) 我的代码如下所示: 但是当我测试代码时,如果我在持续表1行之后终止进程,它仍然会在第一个表中插入记录。所以它不认为这是块事务。 我用的是mysql。 补充一下:上面的表1和表2都使用单独的事务管理器和entityManagerFactory 有人能帮忙

  • Zookeeper实现了一个核心的操作集合,使得对于许多分布式应用非常常见的任务得以实现。你知道多少个应用是需要一个master,或者需要追踪哪个进程是可相应的?然而,Zookeeper不会为你实现那些任务。它不会选举一个领导者,也不会追踪那些活着的进程。相反,它提供了许多工具来实现这些任务。开发者自己觉得他们要实现的是哪种协同任务。

  • 问题内容: 具有相同方法名称和签名的两个接口。但是由单个类实现,那么编译器将如何确定哪个方法用于哪个接口? 例如: 问题答案: 如果一个类型实现两个接口,并且每个接口定义一个具有相同签名的方法,则实际上只有一个方法,并且它们是不可区分的。例如,如果这两个方法的返回类型冲突,那么它将是编译错误。这是继承,方法重写,隐藏和声明的一般规则,并且不仅适用于两个继承的interface方法之间的可能冲突,还

  • 然后,我对一个方法使用了注释,该方法执行以下操作: 这不起作用。是事务性的,但是当调用方法时,没有正在进行的事务,并且我得到一个。 我打算尝试方法,但Javadoc声明这只用于本地事务,因此它似乎不符合我的需要。 我的下一步是尝试直接使用Kafka的Producer API,看看这种模式是否有效,但如果有人能告诉我知道我在浪费时间,Kafka不支持事务性地写多个主题,我会很感激。 我确实在Conf