我使用publishOn和subscribeOn的流量相同,如下所示:
System.out.println("*********Calling Concurrency************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.map(i -> i * 2)
.log()
.publishOn(Schedulers.elastic())
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
System.out.println("-------------------------------------");
虽然,当我使用两者时,日志中不会打印任何内容。但是当我只使用publishOn时,我得到了以下信息日志:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
publishOn比Subscribeon更受推荐吗?或者它比subscribeon有更多的偏好?两者之间的区别是什么,什么时候使用哪个?
我花了一些时间来理解它,可能是因为publishon
通常是在subscribeon
之前解释的,这里有一个更简单的外行解释。
subscribeOn
意味着在指定的调度器工作线程(其他线程)上运行初始源发射,例如subscribe()、onSubscribe()和request()
,对于任何后续操作也是如此,例如onnext/onerror/oncomplete、map等
,无论subscribeOn()的位置如何,都会发生这种行为
如果在fluent调用中没有执行任何publishon
,那么就这样,所有内容都将在这样的线程上运行。
但是,如果在中间调用publishon()
,那么任何后续的操作符调用都将在提供的调度器工作程序上运行到这样的publishon()
。
这里有一个例子
java prettyprint-override">Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());
Flux.range(1, 5)
.doOnNext(consumer)
.map(i -> {
System.out.println("Inside map the thread is " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
.doOnNext(consumer)
.publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
.doOnNext(consumer)
.subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
.subscribe();
结果会是
1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5
正如您所看到的,第一个doonnext()
和下面的map()
在名为subscribeon_thread
的线程上运行,直到调用任何publishon()
,然后任何后续调用都将在为该publishon()
提供的调度程序上运行,并且任何后续调用都将再次发生,直到任何人调用另一个publishon()
。
Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式, 这个功能提供两种信息机制, 分别是订阅/发布到频道和订阅/发布到模式, 下文先讨论订阅/发布到频道的实现, 再讨论订阅/发布到模式的实现。 频道的订阅与信息发送 Redis 的 SUBSCRIBE 命令可以让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时, 信息就会被发送给所有订阅指定频道的客户端
发布(Publication)和订阅(Subscription)是 Meteor 的最基本最重要的概念之一,但是如果你是刚刚开始接触 Meteor 的话,也是有些难度的。 这已经导致不少误解,比如认为 Meteor 是不安全的,或者说 Meteor 应用无法处理大量数据等等。 人们起初会感觉这些概念很迷惑很大程度上是因为 Meteor 像变魔法一样替你做了很多事儿。尽管这些魔法最终看起来很有效,但
Note 本文档翻译自: http://redis.io/topics/pubsub 。 SUBSCRIBE 、 UNSUBSCRIBE 和 PUBLISH 三个命令实现了发布与订阅信息泛型(Publish/Subscribe messaging paradigm), 在这个实现中, 发送者(发送信息的客户端)不是将信息直接发送给特定的接收者(接收信息的客户端), 而是将信息发送给频道(chann
简介 Redis 的列表类型键可以用来实现队列,并且支持阻塞式读取,所以 Redis 能够非常容易的实现一个高性能的优先队列。同时在更高层面上,Redis 还支持“发布/订阅”的消息模式,可以基于此构建一个聊天系统。 发布示例 发布(Publish)即将消息发布到频道中。示例代码: // 发送消息 Redis::publish('chan-1', 'Hello, World!'); // 发送消息
问题内容: 我一直在研究不同的nodeJS发布/订阅实现,并想知道哪种方法最适合特定的应用程序。该应用程序的要求涉及多通道,多用户3D环境中对象的实时同步。 我从使用socket.io开始,创建了一个基本的通道数组,当用户发送消息时,它遍历该通道中的用户并将消息发送到用户的客户端。这很好用,我对此没有任何问题。 为了保持对象的持久性,我使用node_redis添加了Redis支持。然后,我将通道数
主要内容:发布/订阅流程,常用命令汇总,基本命令应用Redis PubSub 模块又称发布订阅者模式,是一种消息传递系统,实现了消息多播功能。发布者(即发送方)发送消息,订阅者(即接收方)接收消息,而用来传递消息的链路则被称为 channel。在 Redis 中,一个客户端可以订阅任意数量的 channel(可译为频道)。 消息多播:生产者生产一次消息,中间件负责将消息复制到多个消息队列中,每个消息队列由相应的消费组进行消费,这是分布式系统常用的