当前位置: 首页 > 知识库问答 >
问题:

为kafkatemplate异步响应创建的异步线程的最大数量是多少

袁博
2023-03-14

ForkJoinPool是用给定的目标并行级别构造的;默认情况下,等于可用处理器的数量

假设我的CPU有2个内核。那么,ForkJoinpool创建的最大线程数是4?

假设我正在执行一个异步操作,该操作在使用默认Forkpool的循环(比如10k)操作中返回一个未来对象。。。那么Forkpool将创建多少线程?

List<ListenableFuture<SendResult<String, String>>> cf = new ArrayList<ListenableFuture<SendResult<String, String>>>();

future = kafkaTemplate.send(topicName, message);
cf.add(future);

i++;

future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

    @Override
    public void onSuccess(SendResult<String, String> result) {
        syso("sent success");
    }

    @Override
    public void onFailure(Throwable ex) {
        System.out.println(" sending failed");
    }
});

而且,在其他一些类中,我正在检查是否所有的未来已经完成或没有:

    for (ListenableFuture<SendResult<String, String>> m : myFutures) {
        m.get();
    }

共有1个答案

常俊爽
2023-03-14

没有额外的线程;期货在生产者的I/O线程上完成。

下面是一个显示回调的测试。。。

@SpringBootApplication
public class So61415751Application {


    private static final Logger LOG = LoggerFactory.getLogger(So61415751Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So61415751Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        template.setProducerListener(new ProducerListener<String, String>() {
            @Override
            public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
                LOG.info(recordMetadata.toString());
            }
        });
        return args -> {
            IntStream.range(0, 9).forEach(i -> template.send("so61415751", "foo" + i));
            LOG.info("Sent");
            Thread.sleep(10_000);
        };
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so61415751").partitions(1).replicas(1).build();
    }

}
spring.kafka.producer.properties.linger.ms=3000

#logging.level.org.springframework.kafka=debug

logging.level.org.apache.kafka=debug

结果

2020-04-24 17:27:46.282  INFO 96084 --- [           main] com.example.demo.So61415751Application   : Sent

...

3 second linger

...

2020-04-24 17:27:49.299  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@63
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@64
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@65
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@66
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@67
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@68
2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@69
2020-04-24 17:27:49.301  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@70
2020-04-24 17:27:49.301  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@71

(线程调用ProducerListener也完成了未来)。

 类似资料:
  • 我试图找出如何在事务上下文中正确处理原子级的对Kafka的多次写入。在此场景中,事务不是由kafka消息侦听器启动的,而是通过@Transactional注释以编程方式启动的,请参见下面的代码段。 我使用的是spring boot 2.4.2和spring kafka 2.6.5。 KafkaProducer文档指出,在事务上下文中,不需要调用。get(),因为它最终会在尝试提交事务时引发异常。此

  • 然而,在我下面的代码中,我希望在这两个示例中都花费相同的15秒(每个任务5秒),如本文所述。然而,第二个示例只需要5秒,同时运行所有3个示例也需要5秒来完成第二个示例。原来的文章花了5秒,但我把它改成了1秒的延迟,让它更明显。 有没有人能解释一下这是怎么回事,为什么它看起来像线程一样运行?

  • 我正在使用来处理通过文件的大量记录。每一行都是一条记录,我将每一行传递给单独的线程进行处理,问题是我必须收集这些处理过的记录以及在处理记录时生成的更多数据,然后在最后的数据收集上应用一些业务逻辑。我将一个通用的传递给所有线程来填充已处理的数据,当我通过visualVM调试它时,我发现(屏幕截图如下)这些线程在等待中花费的时间比在运行中花费的时间多。我想这是因为一个线程在写入时获得了锁。 有没有一种

  • 我有一个spring boot微服务,我们在其中调用多个服务(比如服务a和服务B)。我试图根据某些条件在多个线程上异步调用这两个服务,一旦处理完成,我想合并服务A和服务B的响应。 我知道我们可以使用@Async异步运行一个进程,并使用ExecutorService为一个服务启动多个线程。 但是我不确定如何把所有的东西放在一起。所以在这里寻找任何建议? 我知道这在上面主要是理论上解释的,但我尝试了跟

  • 问题内容: 就像一个人在这里问到但他的解决方案是调用其他函数 …我想知道是否有可能拥有一个不调用a的函数第二个功能基于异步请求的响应,但仅当异步请求响应时。 可能是这样的: 不调用另一个函数,这有可能吗? 我要实现的目标是拥有一个可以用一些参数调用的函数,该函数将返回异步Web服务(如FB)的响应。 问题答案: 简而言之,没有。您不能让异步函数同步返回有意义的值,因为该值当时不存在(因为它是在后台

  • 我有以下代码: 它使用<code>System.Linq。异步,我发现很难理解。在“经典”/“同步LINQ”中,只有在调用<code>ToList() 我担心的另一个问题是多线程。我听过很多次异步!=多线程。那么,< code >控制台怎么可能。WriteLine(线程。CurrentThread . ManagedThreadId);打印各种id?有些id被打印了多次,但总的来说,输出中大约有5