当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka:非事务性生产者回调处理

白光耀
2023-03-14

我正在使用一个非事务性生产者,并试图理解如何处理成功/失败场景的回调。

对于一个成功的发送,我看到回调由kafka-producer-network-thread线程执行(“send ok”消息)。

发送消息成功-kafka-producer-network-thread 00:59:17.522

00:59:16.850 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.3.1
00:59:16.858 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sending: ProducerRecord(
00:59:16.863 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer 
00:59:17.326 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sent
:::
00:59:17.522 [kafka-producer-network-thread | producer-1] TRACE o.s.kafka.core.KafkaTemplate - Sent ok
00:27:33.773 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sending: ProducerRecord(
00:27:33.779 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer 
00:27:33.957 [kafka-producer-network-thread | producer-1] WARN  o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test1=TOPIC_AUTHORIZATION_FAILED}
00:27:33.957 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Topic authorization failed for topics [test1]
00:27:33.958 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BgLUXrqZSLKvOw2Kn0nhVQ
00:27:33.973 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message
:::
00:27:33.975 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.kafka.core.KafkaTemplate - Failed to send

00:27:33.977 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE 
o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@57033d15] close(PT5S)

00:27:33.982 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sent: ProducerRecord
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {

     producer.send(producerRecord, buildCallback(producerRecord, producer, future));
            if (this.autoFlush) {
                flush();
            }
            this.logger.trace(() -> "Sent: " + producerRecord);
            return future;
}

private Callback buildCallback
    return (metadata, exception) -> {
            try {
                if (exception == null) {
:::
                    KafkaTemplate.this.logger.trace(() -> "Sent ok: " + producerRecord + ", metadata: " + metadata);
                }
                else {
:::
                    KafkaTemplate.this.logger.debug(exception, () -> "Failed to send: " + producerRecord);
                }

共有1个答案

伯彦君
2023-03-14

然后我通过...

这就是您的问题--为了发送请求,需要主题的元数据

获取元数据时出错

    null
 类似资料:
  • CustomerDao.InsertCustomer调用回滚,但仍然发送了kafka消息。如果在customer事件上有一个使用者,该事件将客户插入数据仓库,则在转换回滚时,数据仓库和记录系统将不同步。有没有办法让Kafka活页夹在这里是事务性的?

  • 通过 allowNon 事务属性,可以使用相同的 KafkaTemplate 来创建事务性和非事务性生产者。这两种生产者将使用来自Kafka生产者工厂的相同配置。 在某些特定情况下,可能需要为这两种生产者使用不同的配置。 下面的例子展示了一个具体的用例 KafkaTemplate-非事务性生产者- 问题可以通过拥有2个工厂和/或2个模板来解决,但为了避免样板代码和复杂性,我们的想法是保持相同的模板

  • 我从源主题收到一条消息。然后我将消息分成3个部分,并将每个部分发送到3个不同的主题。现在有2条消息成功传递到第2个主题。但是在发送第3条消息时,我们会收到异常(例如ProducerFencedException|OutOfOrderSequenceException|AuthorizationException|RecordLengthException) 如何回滚/还原前2个主题中的其他2条消息

  • 我使用带有幂等生产者配置的spring kafka: 这是我的配置道具: 我的Kafka制作人抛出OutOfOrderSequence异常: 2019-03-06 21:25:47发送者[ERROR][生产者clientId=生产者-1]代理返回org.apache.kafka.common.errors.OutOfOrderSequence异常:代理在偏移-1处收到主题分区主题-1的乱序序列号。

  • 我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?

  • 我正在查看一些现有的代码,并想知道在下面的场景中使用Spring的@Transactional注释会发生什么?考虑以下示例: 下面的updateDataBaseItem()方法是常见的,可以从其他非事务性方法和上面的方法调用: