当前位置: 首页 > 知识库问答 >
问题:

如何使用sping-kafka再次发送消息

庾鸿飞
2023-03-14

我们正在使用spring kafka 1.2.2。释放

我们想要的
1.一旦消息被消费并成功处理,就会在spring-kafka中提交偏移量。
我正在使用Manaul Commit/Ac认收它,它工作正常。

2.在任何异常的情况下,我们希望spring-kafka重新发送相同的消息。
我们对任何系统误差抛出RunTime异常,它由spring-kafka记录并且从未提交。
这很好,因为我们不希望它提交,但该消息将保留在spring-kafka中并且永远不会回来,除非我们重新启动服务。在重启时,消息会回来并再次执行,然后留在spring-kafka

我们尝试的
1。我已经尝试了ErrorHandler和RetryingMessageListenerAdapter,但在这两种情况下,我们都必须在服务中编写如何再次处理消息的代码

这是我的消费者

public class MyConsumer{
    @KafkaListener
    public void receive(...){
        // application logic to return success/failure
        if(success){
            acknowledgement.acknowledge();
        }else{
            throw new RunTimeException();
        }
    }
} 

此外,我有以下集装箱工厂的配置

factory.getContainerProperties().setErrorHandler(new ErrorHandler(){
    @Override
    public void handle(...){
        throw new RunTimeException("");
    }
});

在执行流时,控制首先进入这两个模块,以接收然后处理方法。之后,服务将等待新消息。然而,我期待的是,由于我们抛出了一个异常,并且消息未提交,相同的消息应该再次进入receive方法

有没有办法,我们可以告诉spring kafka“不要提交此消息并尽快再次发送?”

共有2个答案

桓信鸥
2023-03-14

不幸的是,我们可以使用的版本是1.3.7。释放

我尝试过实现ConsumerSekAware接口。下面是我如何做到这一点,我可以看到信息传递的再创造

消费者

public class MyConsumer implements ConsumerSeekAware{
    private ConsumerSeekCallback consumerSeekCallback;
    if(condition) {
            acknowledgement.acknowledge();
        }else {
            consumerSeekCallback.seek((String)headers.get("kafka_receivedTopic"),
                    (int) headers.get("kafka_receivedPartitionId"),
                    (int) headers.get("kafka_offset"));
        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
        this.consumerSeekCallback = consumerSeekCallback;
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> arg0, ConsumerSeekCallback arg1) {
        LOGGER.debug("onIdleContainer called");
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> arg0, ConsumerSeekCallback arg1) {
        LOGGER.debug("onPartitionsAssigned called");
    }
}

配置

public class MyConsumerConfig {

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // Set server, deserializer, group id
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MyModel> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MyModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);
        return factory;
    }

    @Bean
    public MyConsumer receiver() {
        return new MyConsumer();
    }
}
穆俊名
2023-03-14

不再支持1.2. x;建议1. x用户至少升级到1.3. x(当前为1.3.8),因为它的线程模型要简单得多,这要归功于KIP-62。

当前版本为2.2.2。

2.0.1引入了seekToCurInterrorHandler,它重新查找失败的记录,以便重新交付。

对于早期版本,您必须停止并重新启动容器以重新传递失败的消息,或者向侦听器适配器添加重试。

我建议您升级到最新版本。

 类似资料:
  • 假设kafka消息生产者向一个主题发送一条事件消息。然后一个消费者处理这个事件消息。但是,这个消费者进程因为业务错误而抛出异常,所以他想让消息生产者知道它并再次怨恨。 有什么解决办法吗?

  • 如何保持一个连续的流,以“反应”新的丢弃的文件?(或其他事件,如HTTP GET请求或类似的事件)... 例如,如果我不返回PublisherBuilder的实例,而是返回一个整数,那么我的kafka主题将由一个非常巨大的整数值流填充。这就是为什么示例在发送消息时使用一些间隔... 我应该使用一些CompletationStage或CompletableFuture吗?RXJava2?使用哪个li

  • 我知道Kafka制作人会将消息分批处理。每个批属于一个特定的分区。 我的问题是 生产者是否知道每个批次属于哪个分区? 生产者是否知道每个分区的代理地址? 当生产者发送请求时,每个请求包含多个批次还是只包含一个属于目标分区的批次。 如果生产者发送多个批次,接收kafka服务器是否将批次重传到目标分区。

  • 如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。

  • 我有一个工作的WebSocket示例,其中客户端从服务器接收消息。 我不确定当客户端连接时,我应该如何向客户端发送旧消息。 示例: 每个客户端在连接时提供其名称 服务器响应“[名称] 刚刚连接”(对所有客户端) 任何新客户端都不会收到这些消息 我想知道客户端是否有任何方法可以接收旧消息(所有消息或过去 5 分钟内的消息都可以接受)。 我怀疑我可能需要自己捕获这些信息,将其存储在某个地方(如数据库)

  • 如果这是一个重复的问题,我真的很抱歉,但我在其他线程中尝试了许多答案,但没有一个对我有效。 我试图通过使用TLSv1.2协议的SSLSocket向远程服务器发送ISO8583消息,我用密钥库配置了证书,并试图发送一个ISO8583消息示例:08002220010000800000900000011312115000000180105000003 0800:MTI 222001000800000:二