当前位置: 首页 > 知识库问答 >
问题:

利用RxJava避免可变状态

江煜
2023-03-14

我正在学习RxJava,但我遇到了避免可变状态的问题。

我正在解决的问题很简单:有一个条目的输入流和一个条目组的输入流。每个项目都属于一个组(有一个组标识符),并且有一些数据。每个组都有一个标识符和一些数据。许多项目可能属于同一组。目标是将这些输入流组合成一个(项、组)对的输出流,以便:

  • 只有当项目及其组都已知时,才会发出(项目,组)对
  • 每次接收更新的项目数据时,必须发出更新的(项目、组)对
  • 当接收到该组的更新数据时,必须发出与属于该组的所有项相对应的(项,组)对

下面是一个工作实现(ItemWithGroup是一个表示(项、组)对的类):

public class StateMutationWithinOperator {
    private Map<Integer, Group> allGroups = new HashMap<>();
    private Map<Integer, List<Item>> allItems = new HashMap<>();

    public Observable<ItemWithGroup> observe(Observable<Item> items, Observable<Group> groups) {
        return Observable.merge(
            items.flatMap(this::processItem), 
            groups.flatMap(this::processGroup));
    }

    private Observable<ItemWithGroup> processItem(Item item) {
        allItems.computeIfAbsent(item.groupId, missing -> new ArrayList<>())
                .add(item);

        return Observable.just(allGroups)
                .filter(groups -> groups.containsKey(item.groupId))
                .map(groups -> new ItemWithGroup(item, groups.get(item.groupId)));
    }

    private Observable<ItemWithGroup> processGroup(Group group) {
        allGroups.put(group.id, group);

        return Observable.just(allItems)
                .filter(items -> items.containsKey(group.id))
                .flatMapIterable(items -> items.get(group.id))
                .map(item -> new ItemWithGroup(item, group));
    }
}

通过避免更改存储在allGroups和allItems字段中的共享状态,我希望避免在processItem()和processGroup()中产生副作用。我如何实现这一点?

提前道谢!

共有1个答案

傅宏恺
2023-03-14

以下是根据@phoenixwang的建议进行的改进:

public class StateMutationWithinOperator {
  private Map<Integer, Group> allGroups = new HashMap<>();
  private Map<Integer, List<Item>> allItems = new HashMap<>();

  public Observable<ItemWithGroup> observe(Observable<Item> inputItems,
                                           Observable<Group> inputGroups) {
    Observable<ItemWithGroup> processedItems = inputGroups.publish(groups -> {
      groups.subscribe(group -> allGroups.put(group.id, group));

      return groups.flatMap(this::processGroup);
    });

    Observable<ItemWithGroup> processedGroups = inputItems.publish(items -> {
      items.subscribe(item
          -> allItems
          .computeIfAbsent(item.groupId, missing -> new ArrayList<>())
          .add(item));

      return items.flatMap(this::processItem);
    });

    return Observable.merge(processedItems, processedGroups);
  }

  private Observable<ItemWithGroup> processItem(Item item) {
    return Observable.just(allGroups)
        .filter(groups -> groups.containsKey(item.groupId))
        .map(groups -> new ItemWithGroup(item, groups.get(item.groupId)));
  }

  private Observable<ItemWithGroup> processGroup(Group group) {
    return Observable.just(allItems)
        .filter(items -> items.containsKey(group.id))
        .flatMapIterable(items -> items.get(group.id))
        .map(item -> new ItemWithGroup(item, group));
  }
}

它更好,但是publish()运算符仍然会使共享状态发生变化。我可以看到这样做的一个问题是,我们可能同时从两个不同的线程接收一个项目和一个组,并最终同时读写到同一个映射。

 类似资料:
  • 我目前在Android和Kotlin上使用RxJava,但我有一个问题,如果不使用toBlocking(),我无法解决。 我在员工服务中有一个方法,它返回一个可观察的 这一切都很好,因为每当员工发生变化时,这个可观察对象就会发出新的员工列表。但是我想从员工那里生成一个PDF文件,这显然不需要每次员工更改时都运行。另外,我想从PDF生成器方法返回一个可完成的对象。我想在PDF中添加一个标题,然后遍历

  • 问题内容: 我有一些这样的代码: 而且我收到警告,因为使用in闭包是可变变量,所以可能会引起问题。 我该如何避免呢?我的意思是我如何将不可变变量发送给回调,因为这是一个for循环,并且我无法更改代码?换句话说,如何将参数传递给闭包? 问题答案: 您需要创建一个范围以使用自执行功能正确捕获。这是因为整个for循环是一个作用域,也就是说,每次循环都捕获相同的变量。因此,回调将以错误的id结尾,因为的值

  • 问题内容: 我有一个宽度为70%的元素,它漂浮在一个宽度为30%的元素旁边,但是当我添加填充时,它会扩展元素并破坏格式,有没有办法使它仅增加内容距离边缘而不是仅仅扩大它? 问题答案: 使用模型时,边框大小中将包含填充。

  • 我得到警告: app.js:87486[Vue警告]:避免直接更改prop,因为每当父组件重新呈现时,该值将被覆盖。相反,使用基于道具值的数据或计算属性。被变异的道具:“喜欢计数” 我的刀锋 Vue代码

  • 可变状态 数据是不可变的, 但是通过引用实现的状态是可以改变的. Atom Atom 在 Clojure 中可以用于处理事务操作, cljs 由于是单线程, 玩不转. 不过 Atom 还是用于表示单个同步的状态修改, 用法一般是: (def *a (atom 1)) @*a (reset! *a 2) (swap! *a inc) swap! 实际上是一个 Macro, 应对 (reset! *a

  • 让我们看一下使用两种不同的方式去计算单词的个数,第一种方式使用 reduceByKey 另外一种方式使用 groupByKey: val words = Array("one", "two", "two", "three", "three", "three") val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) val