当前位置: 首页 > 知识库问答 >
问题:

KafkaTemplate是事务发送同步还是异步?

司徒宇
2023-03-14

我试图找出如何在事务上下文中正确处理原子级的对Kafka的多次写入。在此场景中,事务不是由kafka消息侦听器启动的,而是通过@Transactional注释以编程方式启动的,请参见下面的代码段。

我使用的是spring boot 2.4.2和spring kafka 2.6.5。

KafkaProducer文档指出,在事务上下文中,不需要调用。get(),因为它最终会在尝试提交事务时引发异常。此外,KafkaTemplate呼吁。KafkaProducer返回的Future上的get(),因此它看起来是同步的。

  @PostMapping("/ingest/{topic}")
  public ResponseEntity ingest(@PathVariable(value = "topic") String topic, @RequestBody String numbersString) {

    List<String> numbers = Arrays.stream(numbersString.split(",")).collect(Collectors.toList());
    kafkaWriterService.writeMany(numbers,topic); 
    return ResponseEntity.ok().build();
  }

  @Transactional
  @Service
  class KafkaWriterService {

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    public void writeMany(List<String> messages, String topic) {

      for (String message : messages) {
        kafkaTemplate.send(message, topic, topic);
      }
    }
  }

所以据我所知下面的KafkaTemplate方法

protected ListenableFuture<SendResult<K, V>> doSend(ProducerRecord<K, V> producerRecord)

等待生产者完成发送,然后返回另一个ListenableFutre,此接口是异步的。

那么这是同步的,因为我们处于事务上下文中,还是我应该等待kafkaTemplate返回的所有ListenableFutures结束?我的意思是考虑到我需要以同步方式回复调用者。

谢谢,谢谢

共有1个答案

盖锐
2023-03-14

不;它只调用get()如果未来立即由发送完成...

        Future<RecordMetadata> sendFuture =
                producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
        // May be an immediate failure
        if (sendFuture.isDone()) {
            try {
                sendFuture.get();
            }
...

在实际执行发送之前,客户端中会出现某些错误。

看见https://github.com/spring-projects/spring-kafka/issues/1437

 类似资料:
  • 我目前正在阅读Trevor Burnham的Async Javascript。到目前为止这是一本很棒的书。 他谈到这个片段和console.log在Safari和Chrome控制台中是“异步”的。不幸的是我无法复制这个。代码如下: 如果这是异步的,我会预期结果是books的结果。将console.log()放在事件队列中,直到所有代码执行完毕,然后运行它,它将具有bar属性。 虽然它是同步运行的,

  • 问题内容: 我有一张上面有插入触发器的表。如果我从存储过程中的一条插入语句中向该表中插入6000条记录,那么在插入触发器完成之前,存储过程会返回吗? 只是为了确保我在正确地思考,触发器应该只被调用一次(我知道“被调用”不是正确的词)一次,因为只有1个insert语句,对吗? 我的主要问题是:即使触发器尚未完成,存储过程也会完成吗? 问题答案: 您的插入触发器将针对整个插入语句运行一次。这就是为什么

  • 问题内容: 我意识到这是一个基本问题,但是我在其他地方找不到答案。 是同步还是异步? 万一它是异步的,是否有可能在传播了动作之后添加一个回调,这是可能的? 问题答案: AFAIK,调度动作是同步的。如果您愿意解决异步调用,则可以在redux中使用thunk- middleware ,其中将dispatch作为回调函数提供,您可以根据需要调用它。

  • 问题内容: 我对诺言有很多困惑。是同步还是异步? 问题答案: 传递函数 为 无极构造同步运行,但任何依赖于它的分辨率将异步调用。即使promise立即解决,任何处理程序都将异步执行(类似于when )-主线程首先运行到末尾。 不管您的Javascript环境如何,都是如此-无论您是在Node还是浏览器中。

  • fixedDelay,任务总是等待,直到上一个任务完成。 上面的cron将每五分钟执行一次,我的问题是:@scheduled cron是否会等待前一个任务完成后再触发下一个作业?