到目前为止,我一直在使用RxJava,但我开始使用 projectreactor.io 的反应器堆芯,因为它符合反应性流规范。
在下面的测试中,我创建了一个生成随机数的热通量(可连接Flux)。我立即连接()它,它预取了256个值(我可以在日志中看到258个值)。我等待5秒钟来模拟订阅者直到一段时间后才会订阅。
主线程唤醒后,RnApp订阅ConnectableFlux,randomNumberGenerator。订阅(新的RnApp())
。然后
RnApp。调用onSubscribe()
并请求10个元素。之后,出现java.lang.IllegalStateException:队列已满
异常(调用了
RnApp.onError()
),为什么?
订户:
public class RnApp implements Subscriber<Float>{
private Subscription subscription;
private List<Float> randomNumbers = new ArrayList<Float>();
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable err) {
err.printStackTrace();
}
@Override
public void onNext(Float f) {
if(this.randomNumbers.size()>=10){
this.subscription.cancel();
}else{
this.randomNumbers.add(f);
}
}
@Override
public void onSubscribe(Subscription subs) {
this.subscription = subs;
this.subscription.request(10);
}
}
发布者测试:
@Test
public void randomNumberReading() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ConnectableFlux<Float> randomNumberGenerator = ConnectableFlux.<Float>create( (c) -> {
SecureRandom sr = new SecureRandom();
int i = 1;
while(true){
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("-----------------------------------------------------"+(i++));
c.onNext(sr.nextFloat());
}
}).log().subscribeOn(Computations.concurrent()).publish();
randomNumberGenerator.connect();
Thread.sleep(5000);
randomNumberGenerator.subscribe(new RnApp());
latch.await();
}
日志:
11:12:05.125 [main] DEBUG r.core.util.Logger$LoggerFactory - Using Slf4j logging framework
11:12:05.363 [concurrent-1] INFO reactor.core.publisher.FluxLog - onSubscribe(io.pivotal.literx.Part10SubscribeOnPublishOn$$Lambda$1/1586600255@29d4caeb)
11:12:05.371 [concurrent-1] INFO reactor.core.publisher.FluxLog - request(256)
-----------------------------------------------------1
11:12:06.000 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.39189225)
-----------------------------------------------------2
...
-----------------------------------------------------257
11:12:08.683 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.34729618)
-----------------------------------------------------258
11:12:08.697 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.7729547)
java.lang.IllegalStateException: Queue full?!
at reactor.core.publisher.FluxPublish$State.onNext(FluxPublish.java:246)
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onNext(FluxSubscribeOn.java:134)
at reactor.core.publisher.FluxLog$LoggerBarrier.doNext(FluxLog.java:130)
at reactor.core.subscriber.SubscriberBarrier.onNext(SubscriberBarrier.java:85)
at reactor.core.subscriber.SubscriberWithContext.onNext(SubscriberWithContext.java:92)
at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:132)
at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:145)
at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:114)
at reactor.core.publisher.FluxGenerate$SubscriberProxy.request(FluxGenerate.java:245)
at reactor.core.subscriber.SubscriberBarrier.doRequest(SubscriberBarrier.java:146)
at reactor.core.publisher.FluxLog$LoggerBarrier.doRequest(FluxLog.java:160)
at reactor.core.subscriber.SubscriberBarrier.request(SubscriberBarrier.java:135)
at reactor.core.util.DeferredSubscription.set(DeferredSubscription.java:71)
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onSubscribe(FluxSubscribeOn.java:129)
at reactor.core.publisher.FluxLog$LoggerBarrier.doOnSubscribe(FluxLog.java:122)
at reactor.core.subscriber.SubscriberBarrier.onSubscribe(SubscriberBarrier.java:67)
at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:72)
at reactor.core.publisher.FluxLog.subscribe(FluxLog.java:67)
at reactor.core.publisher.FluxSubscribeOn$SourceSubscribeTask.run(FluxSubscribeOn.java:363)
at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:919)
at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:883)
at reactor.core.publisher.WorkQueueProcessor$QueueSubscriberLoop.run(WorkQueueProcessor.java:842)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
create()
期望每个回调调用下一个。或者,您可以检查 Flux.yield,
它为您提供了额外的发射方法来处理下游状态(背压或取消)。或者你可以使用Flux.generate
,它类似于创建
,但每个请求调用一次,这样你就可以有效地下一
个传递到需求。
这3个Flux
生成器目前正在讨论中,因此我们可以讨论更好的替代方案http://github.com/reactor/reactor-core/issues.
您还可以使用Flux#delaySubscription阻止发布预取,例如使用最新的快照单播处理器:
UnicastProcessor<Object> p = UnicastProcessor.create();
flux.delaySubscription(p).publish(128).autoConnect().subscribe();
//...
p.onNext(new Object());
与RxJava一样,如果您使用create()
,您将自行处理取消和背压。您可以使用标准运算符构建生成器:
ConnectableFlux<Double> secureRandomFlux = Flux.using(
() -> new SecureRandom(),
sr -> Flux.interval(10, TimeUnit.MILLISECONDS)
.map(v -> sr.nextDouble())
.onBackpressureDrop()
sr -> { }
).publish();
我正在尝试在我的maven项目上运行单元测试(使用Wiremck),并观察到以下错误。 我尝试将添加到中,如下所示 并已将其放在我的项目的文件中,并验证它是否是通过执行设置的,但我仍然收到相同的错误。 它在带有Java 8 u191的Windows机器上工作,并且没有添加任何,但在带有Java 8 u181的Linux上却不起作用。 我还尝试向我的< code>pom.xml添加以下依赖项 但仍然
我编写了一个Spring-Cloud-Stream应用程序,其中制作人将消息发布到指定的Kafka主题。我的问题是,如何添加生产者回调以接收确认/确认消息已成功发布到主题上?就像我们在《KafkaSpring》中所做的那样。发送(记录,新回调{…}) (维护异步生产者)。下面是我的代码: 如何确保tryEmitNext已成功写入主题? 实施ProducerListener是否是一种解决方案,是否可
我正在尝试使用Reactor Netty连接到docker容器上运行的消息队列。由于依赖性问题,我以独立的方式执行此操作,而不是使用SpringFlux。 从示例中的反应Netty留档,我看到有一种方法可以连接到服务器并获得响应: 但是当我之后尝试通过System.out.println()显示输出时,什么都不会发生。 我也试图了解如何使用: <代码>通量 但我不确定该怎么办。我在文档中看到了一个
问题内容: 将DoOutput设置为true时,出现非法状态异常。 为相同显示的堆栈跟踪为: 我没有发现我发送请求时做错了什么。任何人都可以指出缺少的内容或我做错了什么 问题答案: 我遇到了同样的问题并解决了。就我而言,这是因为我在NetBeans的调试界面中忘记了监视。希望它可以帮助其他人犯同样的错误。 如果您有任何相对于请求的响应值,如:手表,,甚至只是,你会在调试模式下出现此错误。 所有先前
异常跟踪: “”java.lang.IllegalStateException:在org.apache.catalina.core.applicationDispatcher.doForward(applicationDispatcher.java:328)在org.apache.catalina.core.applicationDispatcher.doForward(applicationDi
问题内容: 这是我的用法- 另外,我在http GET周围放置了一个finally块- 这是我的堆栈跟踪- 我正在使用Quartz计划监视Http端点的工作。这是我的连接池配置 Maven依赖..工件版本 编辑 -好吧,通过不关闭finally块中的CloseableHttpClient,问题解决了。有人能说出为什么这样吗? 如果关闭客户端,为什么连接池会关闭? 是上面的closeablehttp