我正在使用一个非事务性生产者,并试图理解如何处理成功/失败场景的回调。
对于一个成功的发送,我看到回调由kafka-producer-network-thread线程执行(“send ok”消息)。
发送消息成功-kafka-producer-network-thread 00:59:17.522
00:59:16.850 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.3.1
00:59:16.858 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sending: ProducerRecord(
00:59:16.863 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer
00:59:17.326 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sent
:::
00:59:17.522 [kafka-producer-network-thread | producer-1] TRACE o.s.kafka.core.KafkaTemplate - Sent ok
00:27:33.773 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sending: ProducerRecord(
00:27:33.779 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer
00:27:33.957 [kafka-producer-network-thread | producer-1] WARN o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test1=TOPIC_AUTHORIZATION_FAILED}
00:27:33.957 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Topic authorization failed for topics [test1]
00:27:33.958 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BgLUXrqZSLKvOw2Kn0nhVQ
00:27:33.973 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message
:::
00:27:33.975 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.kafka.core.KafkaTemplate - Failed to send
00:27:33.977 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE
o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@57033d15] close(PT5S)
00:27:33.982 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sent: ProducerRecord
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
if (this.autoFlush) {
flush();
}
this.logger.trace(() -> "Sent: " + producerRecord);
return future;
}
private Callback buildCallback
return (metadata, exception) -> {
try {
if (exception == null) {
:::
KafkaTemplate.this.logger.trace(() -> "Sent ok: " + producerRecord + ", metadata: " + metadata);
}
else {
:::
KafkaTemplate.this.logger.debug(exception, () -> "Failed to send: " + producerRecord);
}
CustomerDao.InsertCustomer调用回滚,但仍然发送了kafka消息。如果在customer事件上有一个使用者,该事件将客户插入数据仓库,则在转换回滚时,数据仓库和记录系统将不同步。有没有办法让Kafka活页夹在这里是事务性的?
通过 allowNon 事务属性,可以使用相同的 KafkaTemplate 来创建事务性和非事务性生产者。这两种生产者将使用来自Kafka生产者工厂的相同配置。 在某些特定情况下,可能需要为这两种生产者使用不同的配置。 下面的例子展示了一个具体的用例 KafkaTemplate-非事务性生产者- 问题可以通过拥有2个工厂和/或2个模板来解决,但为了避免样板代码和复杂性,我们的想法是保持相同的模板
我从源主题收到一条消息。然后我将消息分成3个部分,并将每个部分发送到3个不同的主题。现在有2条消息成功传递到第2个主题。但是在发送第3条消息时,我们会收到异常(例如ProducerFencedException|OutOfOrderSequenceException|AuthorizationException|RecordLengthException) 如何回滚/还原前2个主题中的其他2条消息
我使用带有幂等生产者配置的spring kafka: 这是我的配置道具: 我的Kafka制作人抛出OutOfOrderSequence异常: 2019-03-06 21:25:47发送者[ERROR][生产者clientId=生产者-1]代理返回org.apache.kafka.common.errors.OutOfOrderSequence异常:代理在偏移-1处收到主题分区主题-1的乱序序列号。
我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?
我正在查看一些现有的代码,并想知道在下面的场景中使用Spring的@Transactional注释会发生什么?考虑以下示例: 下面的updateDataBaseItem()方法是常见的,可以从其他非事务性方法和上面的方法调用: