当前位置: 首页 > 知识库问答 >
问题:

Kafka异步发送不是真正的异步?

夏俊杰
2023-03-14

我正在使用Kafka客户端1.0.0库中的KafkaProducer,根据文档,该方法是Future

/**
 * Implementation of asynchronously send a record to a topic.
 */
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // first make sure the metadata for the topic is available
        ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;

还有其他选项是完全异步的吗?我希望它完全异步的问题是,如果bootstrap.servers中的一些服务器没有响应,它将根据max.block.ms等待时间,但我实际上并不希望它等待,而是希望它返回。

我看到它将立即返回的留档:KafkaProducer java doc

发送是异步的,一旦记录存储在等待发送的记录的缓冲区中,该方法将立即返回。这允许并行发送多个记录,而无需在每次记录之后阻塞以等待响应。


共有2个答案

江烨伟
2023-03-14

Kafka维护一个元数据缓存,偶尔更新以保持其最新,在您的场景中,只有当缓存过时或未初始化时,您才会等待。一旦缓存初始化,就没有等待。

如果您的代码有一个即将到来的send(),必须尽快执行,您可以尝试向生产者发送prepreparial partitionsFor()方法调用,以查看是否无法在需要时强制更新缓存。

除此之外,总是有可能偶尔等待元数据缓存被刷新。

微生嘉
2023-03-14

你的分析是正确的——Kafka有一个(有时)阻塞的“非阻塞”API。这是之前提过的——https://cwiki.apache.org/confluence/display/KAFKA/KIP-286:制作人send()不应阻止元数据更新,但决不应优先排序。

 类似资料:
  • 我们正在尝试使用 guzzle 执行并发异步请求。在浏览了一些资源(例如这样和这样)之后,我们提出了一些在下面共享的代码。但是,它没有按预期工作。 看起来Guzzle正在同步而不是异步地处理这些请求。 出于测试目的,我们点击一个内部url,它会Hibernate5秒钟。当并发数为10时,我们预计所有10个请求最初将被排队并几乎同时发送到服务器,在那里它们将等待5秒钟,然后几乎所有的请求将几乎同时完

  • 我试图找出如何在事务上下文中正确处理原子级的对Kafka的多次写入。在此场景中,事务不是由kafka消息侦听器启动的,而是通过@Transactional注释以编程方式启动的,请参见下面的代码段。 我使用的是spring boot 2.4.2和spring kafka 2.6.5。 KafkaProducer文档指出,在事务上下文中,不需要调用。get(),因为它最终会在尝试提交事务时引发异常。此

  • 我需要捕获异步发送到Kafka时的异常。Kafka producer Api附带一个函数send(ProducerRecord记录、回调)。但当我针对以下两种情况进行测试时: Kafka经纪人倒下 主题没有预创建回调没有被调用。相反,我在代码中收到发送不成功的警告(如下所示)。 问题: > 那么回调是否只针对特定的异常调用? Kafka客户端何时尝试在异步发送时连接到Kafka代理:每次批处理发送

  • 问题内容: 我有这样的表格: 我想异步发送这些数据,使用jQuery函数。 编辑:与解决方案: 问题答案: 看到: jQuery文档:发布 jQuery文档:序列化

  • 我希望我的请求触发一些长时间运行的操作,这些操作应该在后台执行。我编写了以下实现,应该在后台处理我的操作,但实际上我的请求是同步执行的: 在日志中,我看到以下内容: 我看到我的在另一个线程中执行,但出于某种原因,我的原始请求等待sleep完成 更新1:

  • 在日志中,我有将近1秒(~800毫秒)的值,为什么会有这么长的执行时间?