更具体的问题是为什么通量。fromIterable()不适用于ReactorNetty HttpClient?这个简单的例子很好用。所有10项均由Flux publisher发出:
public class ConcurrentLinkedQueueFluxTest {
public static final void main(String[] args) {
List<Integer> aList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<>();
aList.stream().forEach(i -> clq.add(i));
Flux.fromIterable(clq)
.subscribe(new ReactiveSubscriber<Integer>());
}
}
使用ReactiveSubscriber:
class ReactiveSubscriber<T> extends BaseSubscriber<T> {
private Subscription subscription;
@Override
public void hookOnSubscribe(Subscription s) {
System.out.println("In hookOnSubscribe");
this.subscription = s;
subscription.request(1);
}
@Override
public void hookOnNext(T response) {
System.out.println("In hookOnNext: "+response.toString());
subscription.request(1);
}
@Override
public void hookOnError(Throwable t) {
System.out.println(t.getLocalizedMessage());
}
@Override
public void hookOnComplete() {
System.out.println("In hookOnComplete");
subscription.cancel();
}
}
如下图所示,如果我在HttpClient中使用类似的订阅服务器。send(Flux.fromIterable())仅发出1项,而不是队列中的所有项。因此,由于这种流量,有些配置不正确。创建通量的fromIterable()方法不适用于HttpClient。
对于实际的生产代码问题,我包括了队列定义、客户机方法、订阅者、服务器方法和一个日志,该日志显示从客户机到服务器只发送了5项中的1项。看起来,即使HttpClient send()方法有一个从队列加载的Flux对象,也只发送一个项目,尽管队列有5个项目。
要发送到服务器的项目位于ByteBuf类型的队列中:
private ConcurrentLinkedQueue<ByteBuf> electionRequestQueue;
public ElectionTransactionRequest() {
electionRequestQueue = new ConcurrentLinkedQueue<ByteBuf>();
}
客户端方法是:
public void task() {
log.debug("Queue size: "+electionRequestQueue.size());
ElectionTransactionSubscriber etSubscriber = new ElectionTransactionSubscriber();
HttpClient.create()
.tcpConfiguration(tcpClient -> tcpClient.host("localhost"))
.port(61005)
.protocol(HttpProtocol.HTTP11)
.post()
.uri("/election/transaction")
.send(Flux.fromIterable(electionRequestQueue))
.responseContent()
.aggregate()
.asByteArray()
.subscribe(etSubscriber);
}
订阅者定义为:
class ElectionTransactionSubscriber extends BaseSubscriber<byte[]> {
private static final Logger log = LoggerFactory.getLogger(ElectionTransactionSubscriber.class);
private Subscription subscription;
@Override
public void hookOnSubscribe(Subscription s) {
log.debug("In hookOnSubscribe");
this.subscription = s;
subscription.request(1);
}
@Override
public void hookOnNext(byte[] response) {
log.info("In hookOnNext");
subscription.request(1);
}
@Override
public void hookOnError(Throwable t) {
log.error(t.getLocalizedMessage());
}
@Override
public void hookOnComplete() {
log.debug("In hookOnComplete");
subscription.cancel();
}
}
服务器端在方法中定义:
public void start() {
disposableServer =
HttpServer.create()
.host("localhost")
.port(61005)
.protocol(HttpProtocol.HTTP11)
.route(routes ->
routes
.post("/election/transaction",
(request, response) -> response.send(request
.receive()
.aggregate()
.flatMap(aggregatedBody ->
electionTransactionHandler.electionTransactionResponse(aggregatedBody)))))
.bindNow();
disposableServer.onDispose().block();
}
当客户端运行时,队列中有5个项目,但只有一个项目被发送到服务器,如日志中所示。在订阅者中,仅发送来自Flux发布者的1个项目后,就会调用方法hookOn完成()。
2020-12-01 12:03:56,442 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionRequest: Queue size: 5
2020-12-01 12:03:56,539 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnSubscribe
2020-12-01 12:03:56,746 INFO [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnNext
2020-12-01 12:03:56,746 DEBUG [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnComplete
缓冲可能是您最容易使用的。它会发出缓冲区(默认情况下是Collection,List)。因此,在您的情况下,订阅者会收到一个列表
StepVerifier.create(
Flux.range(1, 10)
.buffer(5, 3) //overlapping buffers
)
.expectNext(Arrays.asList(1, 2, 3, 4, 5))
.expectNext(Arrays.asList(4, 5, 6, 7, 8))
.expectNext(Arrays.asList(7, 8, 9, 10))
.expectNext(Collections.singletonList(10))
.verifyComplete();
=========================
这是直接从Reactor核心留档:
三种配料
当你有很多元素并且你想把它们分批处理时,你在Reactor中有三种广泛的解决方案:分组、窗口和缓冲。这三者在概念上是相近的,因为它们将通量重新分配到聚合中。分组和窗口创建通量
分组是将源通量拆分为多个批次的行为,每个批次匹配一个键。
关联的运算符是groupBy。
每个组都表示为GroupeFlux,它允许您通过调用其key()方法来检索密钥。
组的内容没有必要的连续性。一旦源元素生成一个新密钥,该密钥的组就会打开,与该密钥匹配的元素最终会在组中(可以同时打开多个组)。
这意味着群体:
Are always disjoint (a source element belongs to one and only one group).
Can contain elements from different places in the original sequence.
Are never empty.
以下示例根据值是偶数还是奇数对其进行分组:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.groupBy(i -> i % 2 == 0 ? "even" : "odd")
.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
.map(String::valueOf) //map to string
.startWith(g.key())) //start with the group's key
)
.expectNext("odd", "1", "3", "5", "11", "13")
.expectNext("even", "2", "4", "6", "12")
.verifyComplete();
警告分组最适合中低数量的组。组还必须被强制消耗(例如被平面图消耗),以便groupBy继续从上游获取数据并提供更多组。有时,这两个约束会成倍增加并导致挂起,例如当您具有高基数并且消耗组的平面图的并发过低时。使用Flux窗口
窗口化是根据大小、时间、边界定义谓词或边界定义发布者的标准将源通量拆分为窗口的行为。
相关联的运算符是windows、windowTimeout、window直到、windows同时和windows何时。
与根据传入键随机重叠的groupBy相反,窗口(大部分时间)是按顺序打开的。
不过,有些变体仍然可以重叠。例如,在window(int maxSize,int skip)中,maxSize参数是窗口关闭后的元素数,skip参数是源中打开新窗口后的元素数。所以如果maxSize
以下示例显示了重叠窗口:
StepVerifier.create(
Flux.range(1, 10)
.window(5, 3) //overlapping windows
.concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
)
.expectNext(1, 2, 3, 4, 5)
.expectNext(4, 5, 6, 7, 8)
.expectNext(7, 8, 9, 10)
.expectNext(10)
.verifyComplete();
注:反向配置(最大尺寸
在基于谓词的窗口通过windows和windows的情况下,具有与谓词不匹配的后续源元素也可能导致空窗口,如以下示例所示:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.windowWhile(i -> i % 2 == 0)
.concatMap(g -> g.defaultIfEmpty(-1))
)
.expectNext(-1, -1, -1) //respectively triggered by odd 1 3 5
.expectNext(2, 4, 6) // triggered by 11
.expectNext(12) // triggered by 13
// however, no empty completion window is emitted (would contain extra matching elements)
.verifyComplete();
用焊剂缓冲
缓冲类似于窗口,但有以下扭曲:它不发射窗口(每个窗口都是通量),而是发射缓冲区(每个窗口都是集合) — 默认情况下,列表)。
缓冲的运算符与窗口的运算符类似:buffer、bufferTimeout、bufferUntil、bufferWhile和bufferWhen。
当相应的窗口操作符打开一个窗口时,缓冲操作符创建一个新集合并开始向其中添加元素。当窗口关闭时,缓冲操作符发出收集。
缓冲也可能导致源元素丢失或缓冲区重叠,如下例所示:
StepVerifier.create(
Flux.range(1, 10)
.buffer(5, 3) //overlapping buffers
)
.expectNext(Arrays.asList(1, 2, 3, 4, 5))
.expectNext(Arrays.asList(4, 5, 6, 7, 8))
.expectNext(Arrays.asList(7, 8, 9, 10))
.expectNext(Collections.singletonList(10))
.verifyComplete();
与窗口化不同,缓冲区直到和缓冲区虽然不发出空缓冲区,如以下示例所示:
StepVerifier.create(
Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
.bufferWhile(i -> i % 2 == 0)
)
.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
.expectNext(Collections.singletonList(12)) // triggered by 13
.verifyComplete();
https://github.com/reactor/reactor-core/blob/master/docs/asciidoc/advancedFeatures.adoc
我使用的是Netty 3.9.5,我有一个简单的客户机-服务器设置,我从http://en.wikipedia.org/wiki/Netty_(软件)#Netty\u TCP\u示例。我扩展了这个示例,将Java search plan对象从客户端发送到服务器。在这个网站上跟踪用户的帮助下,我已经能够让这个程序按预期运行。 现在,我想让我的读卡器/服务器程序同时接受多个客户端。我想我会使用下面列出
我将创建一个身份验证服务器,它本身与一组不同的Oauth2.0服务器交互。Netty似乎是在这里实现网络部分的一个很好的候选者。但在开始之前,我需要澄清一些关于netty的细节,因为我是新手。例行程序如下: > < li> 服务器接受来自客户端的HTTPS连接。 然后,不关闭第一个连接,它通过HTTPS与远程OAuth2.0服务器建立另一个连接并获取数据 毕竟,服务器将结果发送回客户端,客户端应该
我有一个基本的Netty服务器(来自教程)(http://netty.io/wiki/user-guide-for-4.x.html),它会接收来自客户端的请求,但如何向客户端发送字符串呢? 例如,在普通的Minecraft服务器上,您在配置文件中指定“MOTD”,当客户端从服务器列表中发出信号时,它将显示该字符串。我需要做同样的事情,但是从我的服务器代码中。
我已经编写了小型Java 7客户端和服务器应用程序。我有3个自签名的X.509 RSA证书的密钥库。当客户端通过SSL连接时,服务器只发送一个证书的SSL证书消息。我对SSL/TLS有点陌生。我还研究了JSSE代码sun。安全ssl。X509KeyManagerImpl,并找到以下注释: 评论很清楚,服务器将发送单个最佳匹配证书,但我似乎不理解原因。就像在我的例子中一样,我希望服务器发送所有3个证
当涉及到TCP时,Netty确实有很好的文档记录,但我想尝试一个简单的UDP服务器-客户机示例,但没有找到任何好的代码。(主要是邮件列表和据称有错误代码的用户) 有人愿意提供一些简单的例子吗?谢谢!
问题内容: 我将实现类似于Facebook通知和此网站的内容(StackOverflow的通知会通知我们是否有人为我们的问题写评论/答案等)。请注意,用户将使用我的应用程序作为网站而不是移动应用程序。 我遇到以下获取结果的答案,但我需要推送结果而不是获取结果。 根据建议,我在实体类中创建了一个简单方法,并向其中添加了@PostPersist,但此方法不起作用,因此基于此答案,我添加了persist