在以下两个示例中,处理通量流的行为似乎不同。
示例1:
public static void main(String[] args) throws InterruptedException {
log.debug(" Before reading flux stream");
Flux<Customer> results = readFluxStream();
log.debug(Thread.currentThread().getName() + " : After reading flux stream");
results.subscribe(new LearnFlux().new CustomerConsumer());
Thread.sleep(5000);
log.debug(" Exit Main Thread ");
}
public static Flux<Customer> readFluxStream() {
List<Customer> customers = buildCustomers();
Customer[] customerArray = new Customer[customers.size()];
customerArray = customers.toArray(customerArray);
Flux<Customer> temp = Flux.fromArray(customerArray).delayElements(Duration.ofSeconds(1)).log();
return temp;
}
private class CustomerConsumer implements Consumer<Customer> {
@Override
public void accept(Customer customer) {
log.debug(Thread.currentThread().getName() + " This is a consumer " + customer.getFirstName());
}
}
从下面的日志中,我们了解到Flux消费者正在不同的线程中运行。(在 *** ). 我在主线程中引入了睡眠,以便可以在控制台中捕获消费者日志。
19:07:24.695 [***main***] DEBUG com.learnjava.LearnFlux - Before reading flux stream
19:07:24.759 [***main***] DEBUG reactor.util.Loggers - Using Slf4j logging framework
19:07:24.779 [***main***] DEBUG com.learnjava.LearnFlux - main : After reading flux stream
19:07:24.788 [***main***] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
19:07:24.790 [***main***] INFO reactor.Flux.ConcatMap.1 - request(unbounded)
19:07:25.821 [***parallel-1***] INFO reactor.Flux.ConcatMap.1 - onNext(Customer[id=null, firstName='Tom', lastName='Cruise'])
19:07:25.835 [***parallel-1***] DEBUG com.learnjava.LearnFlux - parallel-1 This is a consumer Tom
19:07:26.841 [***parallel-2***] INFO reactor.Flux.ConcatMap.1 - onNext(Customer[id=null, firstName='Jim', lastName='Carry'])
19:07:26.842 [***parallel-2***] DEBUG com.learnjava.LearnFlux - parallel-2 This is a consumer Jim
19:07:26.844 [***parallel-2***] INFO reactor.Flux.ConcatMap.1 - onComplete()
19:07:29.817 [***main***] DEBUG com.learnjava.LearnFlux - Exit Main Thread
示例2
public interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {
@Query("SELECT * FROM customer WHERE last_name = :lastname")
Flux<Customer> findByLastName(String lastName);
}
public class CustomerConsumer implements Consumer<Customer> {
private static final Logger log = LoggerFactory.getLogger(CustomerConsumer.class);
@Override
public void accept(Customer customer) {
log.info(" This is a concusmer " + customer);
}
}
log.info(" Invoking R2DBC flux response ");
Flux<Customer> customers = repository.findAll();
customers.subscribe(new CustomerConsumer());
log.info("complete consumer in main thread");
从下面的日志中,我们观察到消费者在同一个主线程中运行。(在***中突出显示)
[***main***] Invoking R2DBC flux response
[***main***] This is a concusmer Customer[id=1, firstName='Jack', lastName='Bauer']
[***main***]This is a concusmer Customer[id=2, firstName='Chloe', lastName='O'Brian']
[***main***] This is a concusmer Customer[id=3, firstName='Michelle', lastName='Dessler']
[***main***] complete consumer in main thread
澄清:
为什么第一个示例中的Flux消费者在不同的线程中运行,因为基于R2DBC的存储库(第二个示例)返回的Flux在同一个主线程中处理?
为什么第一个示例中的Flux消费者在不同的线程中运行,因为基于R2DBC的存储库(第二个示例)返回的Flux在同一个主线程中处理?
这里的关键理解是,任何反应式操作符都可以根据需要切换线程(或者更准确地说,调度程序)。虽然大多数操作员不切换,但基于时间的操作员必须切换,并且他们默认使用并行调度程序。
在第一个示例中,您使用了delayElements()
运算符。由于它是一个基于时间的操作符,默认情况下,它会切换到并行调度程序,然后并行调度程序在并行执行器(以及您在日志中看到的并行线程)上运行。基于时间的调度程序必须切换为“即时”调度程序,这将使您的操作保持在同一个线程上,无法进行基于时间的调度(这是delayElements操作员所需要的。)
这并不是说如果你有特殊的理由不使用并行调度程序,那么你就必须使用并行调度程序——有一个重载可以让你随意设置它。如果您使用.delayElements(持续时间秒(1)),则调度程序。boundedElastic())例如,您将看到日志将显示正在使用的绑定弹性线程池。
相反,在第二个R2DBC示例中,没有操作员将其从即时调度器中切换。正如您从日志中看到的那样,它将只在主线程上运行。
若你们想更深入地了解线程在Reactor中的工作原理,西蒙的通量飞行演讲非常值得一看:https://m.youtube.com/watch?v=sNgTTcG-fEU-还有一些附带的博客帖子。
服务B可以从Eureka服务器通过服务发现获得服务A的url和portno。服务B注册尤里卡服务器有什么意义?
我对Kafka有一个概念上的问题。 我们有许多机器在一个主题上充当消费者,有许多分区。这些机器运行在不同的硬件设置上,将会有比其他机器具有更高吞吐量的用户。 现在,使用者和一个或多个分区之间存在直接的相关性。
我目前正在探索Kafka,作为一个简单问题的初学者。 将有一个生产者向一个主题推送消息,但将有n个spark应用程序的消费者从kafka发送消息并插入到数据库中(每个消费者插入到不同的表中)。 是否有可能消费者会不同步(例如消费者的某些部分会停机很长一段时间),然后一个或多个消费者不会处理消息并插入到表中? 假设代码总是正确的,在按摩数据时不会出现异常。重要的是每条消息只处理一次。 我的问题是,K
我有两个线程的问题,似乎没有正确同步。我基本上有一个布尔值名为“已占用”。当没有线程启动时,它被设置为false。但是当一个线程启动时,线程集被占用是真的,我有一个类,它有线程(run),它们调用下面的函数。 这是一个模拟银行的示例,它接收一个金额(初始余额),然后随机执行取款和存款。我的教授提到了一些关于从取款线程到存款线程的信号?这是怎么回事?在提取线程中,它应该运行到余额为2低,并等待存款线
我想用C#听Kafka主题的消息。 与Java中一样,还有一个注释@KafkaListener,当添加到函数上方时,它会侦听来自主题的消息,然后执行函数的逻辑。 示例-@KafkaListener(topics=“topicname”,groupId=“groupId”)//这里的函数代码 同样,C#中是否有使用Confluent的注释。Kafka
我有一个SOAP Web服务,它发送一个kafka请求消息,并等待一个kafka响应消息(例如,consumer.poll(10000))。 每次调用web服务时,它都会创建一个新的Kafka生产者和一个新的Kafka消费者。 每次调用web服务时,使用者都会收到相同的消息(例如,具有相同偏移量的消息)。 我使用的是Kafka0.9,启用了自动提交,并且自动提交频率为100毫秒。 更新0001