当前位置: 首页 > 知识库问答 >
问题:

关于Project Reactor平面地图中线程的困惑

夏飞掣
2023-03-14
@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person {
    @Id
    Integer id;
    String name;
}
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> {
}
@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication {

    private final ReactivePersonRepository reactivePersonRepository;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveDatabaseApplication.class, args);
    }

    @PostConstruct
    public void postConstruct() {
        Scheduler single = Schedulers.newSingle("single-scheduler");
        IntStream.range(0, 10).forEach(i ->
                Flux.just(Person.builder()
                        .id(i)
                        .name("PersonName")
                        .build())
                        .flatMap(personToSave -> {
                            System.out.println(String.format(
                                    "Saving person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.save(personToSave);
                        })
                        //.publishOn(single)
                        .flatMap(savedPerson -> {
                            System.out.println(String.format(
                                    "Finding person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.findById(savedPerson.getId());
                        })
                        //.publishOn(single)
                        .flatMap(foundPerson -> {
                            System.out.println(String.format(
                                    "Deleting person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.deleteById(foundPerson.getId());
                        })
                        //.publishOn(single)
                        .subscribeOn(single)
                        .subscribe(aVoid -> System.out.println(String.format(
                                "Subscription from thread %s", Thread.currentThread().getName()))));
    }
}

因此,将此运算符放在链中的任何位置也会影响onnext/onerror/oncomplete信号的执行*上下文,从链的开始直到下一次出现{@link publishOn(Scheduler)publishOn}

这让我有点困惑,因为当处理链中没有指定任何publishon时,线程名称的打印值为:

从线程single-scheduler-1中保存person-如预期

从线程线程-13查找人员

从线程线程-6查找人员

从线程线程-15查找人员

共有1个答案

刘承运
2023-03-14

这个人为的例子可能会让它更清楚:

Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
        .flatMap(x -> {
            System.out.println(String.format(
                    "Saving person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Finding person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Deleting person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .subscribeOn(single)
        .subscribe(aVoid -> System.out.println(String.format(
        "Subscription from thread %s", Thread.currentThread().getName())));

它将给出类似于:

Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4

或者,换句话说,您的反应存储库不是在同一个调度程序上发布的,这就是为什么您会看到您所做的行为。“直到下一次出现publishon()”并不意味着下次代码调用publishon()时,它也可以位于任何flatmap()调用中的任何发布服务器中,您无法控制这些调用。

 类似资料:
  • 但后者给了我下面的例外。这是为什么? java.lang.StringIndexOutOfBoundsException:String index超出范围:1 java.lang.StringIndexOutOfBoundsException:String index超出范围:1 java.lang.String.charat(String.java:658)在Scala.Collection.i

  • 我在处理这个Leetcode问题,它要求对树进行扁平化(请参见:https://Leetcode.com/problems/flatten-binary-tree-to-linked-list),这就是我的代码,但我很困惑为什么输出返回的是相同的原始树。我不是直接修改树本身来返回吗?我好像少了什么?

  • 我是Android的新手,我对服务和线程的生命周期感到很困惑。 假设我有一个活动,它通过调用startService()在工作线程中启动服务。根据谷歌文档,即使调用组件被破坏,该服务也将无限期运行。 所以我的问题是:如果应用程序进程被完全销毁(退出并从后台堆栈中清理),服务是否仍在运行?UI线程是否仍在运行?如果是,这是否意味着即使其调用进程被破坏,线程也不一定会被终止?

  • 问题内容: 我对线程安全性有疑问。据我所知,SimpleDateFormat不是线程安全的。我想知道如果在spring控制器中以以下方式使用它会产生什么影响: 稍后在我的控制器功能中,我将其如下使用: 然后将calcDate添加到我的模型对象中,并返回ModelAndView。 那么用这种方式我会看到什么样的问题呢?是否只需删除static关键字即可解决任何问题,因为每个线程将使用其自己的date

  • 一面8.29 项目介绍,项目的难点,怎么解决的,以及项目中的一些技术问题,是否涉及音视频渲染,做的二维还是三维的,介绍一个深度学习的比赛,遇到的困难,怎么解决的,你负责那些工作,至此约二十多分钟 介绍下QT的信号和槽,信号和槽的优缺点,答松耦合,追问松耦合的体现形式,继续追问如果让你实现信号和槽你会怎么实现,答哈希表,追问如果使用哈希表会出现什么问题 问C++多态,追问多态在软件框架中的好处,继续

  • (预测算法工程师) 1.自我介绍 2.手撕:给定一个数组,求子数组中的最大和,相邻元素不能选择。 3.手撕:矩阵路径 4.解释一下多模态轨迹预测,输出如何体现的轨迹 5.如果多数车辆都是保持直行的话,输出的多模态有没有可能是3条直行轨迹,怎么解决。如何提升轨迹预测模型的预测效果 6.说一下multipath 7.注意力机制的计算过程 8.transformer中addnorm的作用 9.trans