我们也正在手动更新成功提交后的偏移。KafkaTransactionManager用于维护事务。由于消息是通过RestController发布到firstTopic的,我们的@Transactional从那里开始,在偏移更新时结束。为此,我们使用executeInTransaction()。
Kafka造型
@Bean
public ProducerFactory producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.RETRIES_CONFIG, 10);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
DefaultKafkaProducerFactory<String, User> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
defaultKafkaProducerFactory.setTransactionIdPrefix("trans");
//defaultKafkaProducerFactory.transactionCapable();
return defaultKafkaProducerFactory;
//return new DefaultKafkaProducerFactory<>(config);
}
/**
* New configuration for the consumerFactory added
*
* @return
*/
@Bean
public ConsumerFactory<String, User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "firstTopic-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<User>(User.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setTransactionManager(transactionManager());
factory.setRetryTemplate(kafkaRetry());
factory.setStatefulRetry(true);
factory.setErrorHandler(getErrorHandler());
factory.setRecoveryCallback(retryContext -> {
//implement the logic to decide the action after all retries are over.
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
System.out.println("Recovery is called for message " + consumerRecord.value());
return Optional.empty();
});
return factory;
}
public RetryTemplate kafkaRetry() {
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(10 * 1000);
backOffPolicy.setMultiplier(1);
backOffPolicy.setMaxInterval(60 * 1000); // original 25 * 60 * 1000
retryTemplate.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(4);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
public SeekToCurrentErrorHandler getErrorHandler() {
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler() {
@Override
public void handle(Exception thrownException,
List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
//super.handle(thrownException, records, consumer, container);
if (!records.isEmpty()) {
ConsumerRecord<?, ?> record = records.get(0);
String topic = record.topic();
long offset = record.offset();
int partition = record.partition();
if (thrownException instanceof DeserializationException) {
System.out.println("------1111------deserialization exception ");
} else {
System.out.println("------xxxx------Record is empty ");
consumer.seek(new TopicPartition(topic, partition), offset);
}
} else {
System.out.println("------4444------Record is empty ");
}
}
};
return errorHandler;
}
Rest控制器
@RestController
@RequestMapping("accounts")
public class UserResource {
@Autowired
KafkaTemplate<String, User> kafkaTemplate;
@PostMapping("/users")
@Transactional
public String postComments(@RequestParam("firstName") final String firstName,
@RequestParam("lastName") final String lastName,
@RequestParam("userName") final String userName) {
List<String> accountTypes = new ArrayList<String>();
kafkaTemplate.send("firstTopic", new User(firstName, lastName, userName));
return "Message sent to the Error queue";
}
}
@Service
public class KafkaMessageConsumerService {
@Autowired
KafkaTemplate<String, User> kafkaTemplate;
int index = 0;
@KafkaListener(topics = "firstTopic", groupId = "firstTopic-group")
//@Transactional
public void onCustomerMessage(User user, Acknowledgment acknowledgment) throws Exception {
kafkaTemplate.executeInTransaction(t -> {
int number = (int) (Math.random() * 10);
t.send("secondtopic", user);
if (number % 2 == 0 || number % 2 == 1) {
System.out.println("about to ack");
throw new RuntimeException(" Modulus is zero ");
}
acknowledgment.acknowledge();
return true;
});
}
}
这是解决生产者失败的正确方法吗?
有没有方法在第10次重试后捕获异常并根据业务规则处理消息?我们可以保留2个RetryTemplates和SeekToErrorHandlers吗?
您不应该在那里使用executeintransaction
(除非您确实想在其他事务中执行发布)。侦听器容器已启动事务。
此外,acknowledge()
将偏移量发送到容器的事务,而不是模板启动的事务。
这可能就是为什么您在确认上会出现奇怪的错误;我不知道.
我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?
我们有一个ActiveMQ Artemis 2.14.0实例,其中包含队列和生成器,该实例有时会失败,并出现以下错误: 我们需要了解可能出现这种情况的情况。我们看到这种情况时有发生,也不知道确切的情况。有人经历过这一点并能提供任何解决方案吗?
我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统
我正在用Springboot做一个简单的Kafka示例项目,我遇到了一个错误,制作人没有创建,但其余的工作正常。 我遇到的错误似乎引发了异常,因为制作人没有创建,但没有解释原因,我也不知道: 这是我的kafka配置: 这里是控制器,endpoint“/api/kafka”:
我读了一些RxJava中的背压文档,但我找不到详细的解释,比如它是如何在库中内部发生的,每个人都只是总结说“生产者”太快,“消费者”太慢。 例如,如下面的代码: 我已经看过了RxJava源代码,所以我的理解是,在主线程中,我们将每毫秒发出一次事件,一旦发出,我们就将值传递给系统。出来println(i)方法,并将其扔进newhead调度器的线程池,然后在可运行程序中运行该方法。 所以我的问题是,异
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要