我正在学习RxJava,但我遇到了避免可变状态的问题。
我正在解决的问题很简单:有一个条目的输入流和一个条目组的输入流。每个项目都属于一个组(有一个组标识符),并且有一些数据。每个组都有一个标识符和一些数据。许多项目可能属于同一组。目标是将这些输入流组合成一个(项、组)对的输出流,以便:
下面是一个工作实现(ItemWithGroup是一个表示(项、组)对的类):
public class StateMutationWithinOperator {
private Map<Integer, Group> allGroups = new HashMap<>();
private Map<Integer, List<Item>> allItems = new HashMap<>();
public Observable<ItemWithGroup> observe(Observable<Item> items, Observable<Group> groups) {
return Observable.merge(
items.flatMap(this::processItem),
groups.flatMap(this::processGroup));
}
private Observable<ItemWithGroup> processItem(Item item) {
allItems.computeIfAbsent(item.groupId, missing -> new ArrayList<>())
.add(item);
return Observable.just(allGroups)
.filter(groups -> groups.containsKey(item.groupId))
.map(groups -> new ItemWithGroup(item, groups.get(item.groupId)));
}
private Observable<ItemWithGroup> processGroup(Group group) {
allGroups.put(group.id, group);
return Observable.just(allItems)
.filter(items -> items.containsKey(group.id))
.flatMapIterable(items -> items.get(group.id))
.map(item -> new ItemWithGroup(item, group));
}
}
通过避免更改存储在allGroups和allItems字段中的共享状态,我希望避免在processItem()和processGroup()中产生副作用。我如何实现这一点?
提前道谢!
以下是根据@phoenixwang的建议进行的改进:
public class StateMutationWithinOperator {
private Map<Integer, Group> allGroups = new HashMap<>();
private Map<Integer, List<Item>> allItems = new HashMap<>();
public Observable<ItemWithGroup> observe(Observable<Item> inputItems,
Observable<Group> inputGroups) {
Observable<ItemWithGroup> processedItems = inputGroups.publish(groups -> {
groups.subscribe(group -> allGroups.put(group.id, group));
return groups.flatMap(this::processGroup);
});
Observable<ItemWithGroup> processedGroups = inputItems.publish(items -> {
items.subscribe(item
-> allItems
.computeIfAbsent(item.groupId, missing -> new ArrayList<>())
.add(item));
return items.flatMap(this::processItem);
});
return Observable.merge(processedItems, processedGroups);
}
private Observable<ItemWithGroup> processItem(Item item) {
return Observable.just(allGroups)
.filter(groups -> groups.containsKey(item.groupId))
.map(groups -> new ItemWithGroup(item, groups.get(item.groupId)));
}
private Observable<ItemWithGroup> processGroup(Group group) {
return Observable.just(allItems)
.filter(items -> items.containsKey(group.id))
.flatMapIterable(items -> items.get(group.id))
.map(item -> new ItemWithGroup(item, group));
}
}
它更好,但是publish()运算符仍然会使共享状态发生变化。我可以看到这样做的一个问题是,我们可能同时从两个不同的线程接收一个项目和一个组,并最终同时读写到同一个映射。
我目前在Android和Kotlin上使用RxJava,但我有一个问题,如果不使用toBlocking(),我无法解决。 我在员工服务中有一个方法,它返回一个可观察的 这一切都很好,因为每当员工发生变化时,这个可观察对象就会发出新的员工列表。但是我想从员工那里生成一个PDF文件,这显然不需要每次员工更改时都运行。另外,我想从PDF生成器方法返回一个可完成的对象。我想在PDF中添加一个标题,然后遍历
问题内容: 我有一些这样的代码: 而且我收到警告,因为使用in闭包是可变变量,所以可能会引起问题。 我该如何避免呢?我的意思是我如何将不可变变量发送给回调,因为这是一个for循环,并且我无法更改代码?换句话说,如何将参数传递给闭包? 问题答案: 您需要创建一个范围以使用自执行功能正确捕获。这是因为整个for循环是一个作用域,也就是说,每次循环都捕获相同的变量。因此,回调将以错误的id结尾,因为的值
问题内容: 我有一个宽度为70%的元素,它漂浮在一个宽度为30%的元素旁边,但是当我添加填充时,它会扩展元素并破坏格式,有没有办法使它仅增加内容距离边缘而不是仅仅扩大它? 问题答案: 使用模型时,边框大小中将包含填充。
我得到警告: app.js:87486[Vue警告]:避免直接更改prop,因为每当父组件重新呈现时,该值将被覆盖。相反,使用基于道具值的数据或计算属性。被变异的道具:“喜欢计数” 我的刀锋 Vue代码
可变状态 数据是不可变的, 但是通过引用实现的状态是可以改变的. Atom Atom 在 Clojure 中可以用于处理事务操作, cljs 由于是单线程, 玩不转. 不过 Atom 还是用于表示单个同步的状态修改, 用法一般是: (def *a (atom 1)) @*a (reset! *a 2) (swap! *a inc) swap! 实际上是一个 Macro, 应对 (reset! *a
让我们看一下使用两种不同的方式去计算单词的个数,第一种方式使用 reduceByKey 另外一种方式使用 groupByKey: val words = Array("one", "two", "two", "three", "three", "three") val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) val