当前位置: 首页 > 知识库问答 >
问题:

executeInTransaction内部的生产者失败

乜栋
2023-03-14

我们也正在手动更新成功提交后的偏移。KafkaTransactionManager用于维护事务。由于消息是通过RestController发布到firstTopic的,我们的@Transactional从那里开始,在偏移更新时结束。为此,我们使用executeInTransaction()。

Kafka造型


@Bean
public ProducerFactory producerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    config.put(ProducerConfig.RETRIES_CONFIG, 10);
    config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
    DefaultKafkaProducerFactory<String, User> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
    defaultKafkaProducerFactory.setTransactionIdPrefix("trans");
    //defaultKafkaProducerFactory.transactionCapable();
    return defaultKafkaProducerFactory;
    //return new DefaultKafkaProducerFactory<>(config);
}

/**
 * New configuration for the consumerFactory added
 *
 * @return
 */
@Bean
public ConsumerFactory<String, User> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "firstTopic-group");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<User>(User.class));
}


@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.getContainerProperties().setTransactionManager(transactionManager());
    factory.setRetryTemplate(kafkaRetry());
    factory.setStatefulRetry(true);
    factory.setErrorHandler(getErrorHandler());
    factory.setRecoveryCallback(retryContext -> {
        //implement the logic to decide the action after all retries are over.
        ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
        System.out.println("Recovery is called for message  " + consumerRecord.value());
        return Optional.empty();
    });

    return factory;
}


public RetryTemplate kafkaRetry() {
    RetryTemplate retryTemplate = new RetryTemplate();

    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(10 * 1000);
    backOffPolicy.setMultiplier(1);
    backOffPolicy.setMaxInterval(60 * 1000);       // original 25 * 60 * 1000
    retryTemplate.setBackOffPolicy(backOffPolicy);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(4);
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}


public SeekToCurrentErrorHandler getErrorHandler() {
    SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler() {

        @Override
        public void handle(Exception thrownException,
                           List<ConsumerRecord<?, ?>> records,
                           Consumer<?, ?> consumer,
                           MessageListenerContainer container) {
            //super.handle(thrownException, records, consumer, container);
            if (!records.isEmpty()) {
                ConsumerRecord<?, ?> record = records.get(0);
                String topic = record.topic();
                long offset = record.offset();
                int partition = record.partition();

                if (thrownException instanceof DeserializationException) {
                    System.out.println("------1111------deserialization exception ");
                } else {
                    System.out.println("------xxxx------Record is empty ");
                    consumer.seek(new TopicPartition(topic, partition), offset);
                }
            } else {
                System.out.println("------4444------Record is empty ");
            }

        }
    };

    return errorHandler;
}

Rest控制器

@RestController
@RequestMapping("accounts")
public class UserResource {

    @Autowired
    KafkaTemplate<String, User> kafkaTemplate;


    @PostMapping("/users")
    @Transactional
    public String postComments(@RequestParam("firstName") final String firstName,
                               @RequestParam("lastName") final String lastName,
                               @RequestParam("userName") final String userName) {

        List<String> accountTypes = new ArrayList<String>();
        kafkaTemplate.send("firstTopic", new User(firstName, lastName, userName));

        return "Message sent to the Error queue";
    }

}
@Service
public class KafkaMessageConsumerService {

    @Autowired
    KafkaTemplate<String, User> kafkaTemplate;

    int index = 0;

    @KafkaListener(topics = "firstTopic", groupId = "firstTopic-group")
    //@Transactional
    public void onCustomerMessage(User user, Acknowledgment acknowledgment) throws Exception {            


        kafkaTemplate.executeInTransaction(t -> {
            int number = (int) (Math.random() * 10);


            t.send("secondtopic", user);
            if (number % 2 == 0 || number % 2 == 1) {
                System.out.println("about to ack");
                throw new RuntimeException(" Modulus is zero ");
            }
            acknowledgment.acknowledge();

            return true;
        }); 
    }


}

这是解决生产者失败的正确方法吗?

有没有方法在第10次重试后捕获异常并根据业务规则处理消息?我们可以保留2个RetryTemplates和SeekToErrorHandlers吗?

共有1个答案

阎晗日
2023-03-14

您不应该在那里使用executeintransaction(除非您确实想在其他事务中执行发布)。侦听器容器已启动事务。

此外,acknowledge()将偏移量发送到容器的事务,而不是模板启动的事务。

这可能就是为什么您在确认上会出现奇怪的错误;我不知道.

 类似资料:
  • 我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?

  • 我们有一个ActiveMQ Artemis 2.14.0实例,其中包含队列和生成器,该实例有时会失败,并出现以下错误: 我们需要了解可能出现这种情况的情况。我们看到这种情况时有发生,也不知道确切的情况。有人经历过这一点并能提供任何解决方案吗?

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 我正在用Springboot做一个简单的Kafka示例项目,我遇到了一个错误,制作人没有创建,但其余的工作正常。 我遇到的错误似乎引发了异常,因为制作人没有创建,但没有解释原因,我也不知道: 这是我的kafka配置: 这里是控制器,endpoint“/api/kafka”:

  • 我读了一些RxJava中的背压文档,但我找不到详细的解释,比如它是如何在库中内部发生的,每个人都只是总结说“生产者”太快,“消费者”太慢。 例如,如下面的代码: 我已经看过了RxJava源代码,所以我的理解是,在主线程中,我们将每毫秒发出一次事件,一旦发出,我们就将值传递给系统。出来println(i)方法,并将其扔进newhead调度器的线程池,然后在可运行程序中运行该方法。 所以我的问题是,异

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要