当前位置: 首页 > 知识库问答 >
问题:

反应式存储库

芮岳
2023-03-14

我尝试创建一些基于反应式堆栈(反应器WebFlux)的基本Spring 5应用程序。

我的下一个目标是实现能够:

  1. 保存书籍。
  2. 查找所有图书。

我的存储库需要涵盖以下场景:

方案 A:

  1. 没有人订阅FindAll
  2. 有人保存了一本书(id=1)
  3. Client1订阅FindAll
  4. 书籍(id=1)被推送到Client1(Client1保持订阅状态,流未完成!)
  5. 有人保存了一本书(id=2)
  6. 书籍(id=2)被推送到Client1(Client1保持订阅状态,流未完成!)

所以,在我看来,这个场景是冷源和热源概念的混合。在任何人订阅之前,我们会收集某人保存在我们存储库中某个缓冲区中的数据(比如普通列表)。对于将订阅FindAll的所有订阅者,我们需要推送缓冲列表(在他订阅之前收集的),并且不要完成流以允许推送以后的集合更新。

我可以做到这一点,但我仍然在想有没有更简单的方法来做到这一点?也许Reactor项目中有一个解决方案已经涵盖了这种情况?

我的实现:

public class InMemoryBookRepository {

private final Map<String, Book> bookMap = new ConcurrentHashMap<>();
private final UnicastProcessor<Book> processor = UnicastProcessor.create();
private final FluxSink<Book> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
private final Flux<Book> hotFlux = processor.publish().autoConnect();

@Override
public void save(Book book) {
    bookMap.put(book.getId(), book);
    fluxSink.next(book);
}

@Override
public Flux<Book> findAll() {
    //without fromIterable I cannot push books that where saved before someone subscribed
    return Flux.fromIterable(bookMap.values())
            .concatWith(hotFlux)
            //Unfortunately this solution produces duplicates so we need to filter them
            .distinct();
}
}

Ofc,我不能只使用冷出版商 - 因为流将在出版收集的书籍后完成。出于同样的原因,我不能使用Hot one,因为我会错过在某人订阅之前生成的元素。

旁注:在我的代码中,我的地图没有任何清理机制,所以它会在某个时候产生异常,但这目前并不重要。

共有1个答案

西门马鲁
2023-03-14

这么简单……不知道为什么错过了这个漂亮的接线员:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#replay--

所以基本上删除整个List/Map部分代码并使用replay()而不是发布()

简化示例:

UnicastProcessor<String> processor = UnicastProcessor.create();
FluxSink<String> fluxSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
//change 'publish()' to 'replay()'
Flux<String> hotFlux = processor.publish().autoConnect(); 

hotFlux.subscribe(n -> log.info("1st subscriber: {}", n));

fluxSink.next("one");

hotFlux.subscribe(n -> log.info("2nd subscriber: {}", n));

fluxSink.next("two");

输出与发布()

1st subscriber: one
1st subscriber: two
2nd subscriber: two

使用replay()输出:

1st subscriber: one
2nd subscriber: one
1st subscriber: two
2nd subscriber: two

 类似资料:
  • 正在执行查询,但未得到任何结果。 路由器:-api/v1/service/appt/usr/{usr_id} 存储库代码, 从日志中查询, 项目表中的数据, 邮递员请求, 不确定这有什么问题。它不会返回记录。

  • 你好,我是新的反应原生,我试图安装反应原生sqlite存储改变安装,我按照Android安装的所有指令:[https://www.npmjs.com/package/react-native-sqlite-storage][1] 我尝试使用命令:react native run android运行它,但每次我都会遇到错误: 失败:生成失败,出现异常。错误:无法确定任务“:app:mergeDebu

  • 使用分页已经有问题了。接受的解决方案适用于采用格式的查询。项目文档还引用了以下用法: https://docs.spring.io/spring-data/mongodb/docs/current/reference/html/#mongo.reactive.repositories.usage 例158。用于持久化个人实体的基本存储库接口 问题是这对findAll不起作用: 例外情况: Spri

  • 这里已经提出并回答了类似的问题。解决方案是将日志记录级别从组织:: 我的情况的不同之处在于,我使用的是被动支持,上面的坏男孩不起作用。我还尝试将 中的所有内容都设置为 DEBUG,但仍然无法在日志中看到任何查询。 我想反应式存储库有一些特别的地方,我没有提到。任何想法都非常欢迎!

  • 当我启动应用程序时,我得到了错误我的Pom有以下依赖项,我使用的是 下面是我的存储库接口。 当我启动应用程序时,它会抛出错误: 可以有人请建议如果JPA不支持,然后我应该使用什么,任何帮助都是感激的…

  • 问题内容: 以下是React中的反模式吗?我喜欢这种模式,因为当实例化一个组件时,它在静态函数中为我提供了上下文。然后,我可以导入该类并调用静态方法来修改状态。还是可以通过更好的方式来完成? 问题答案: 显然,这取决于条件,可能是一种反模式,也可能是一个错误。静态类方法不应与类实例一起使用。绑定到特定的组件实例和用途,这只能证明类是单例是合理的(尽管单例也经常是反模式)。如果期望有多个类实例,那么