我有一个注册了回拨的服务,现在我想将其公开为可流动的
,具有某些要求/限制:
以下是我目前的情况
class MyBroadcaster {
private PublishProcessor<Packet> packets = PublishProcessor.create();
private Flowable<Packet> backpressuredPackets = packets.onBackpressureLatest();
public MyBroadcaster() {
//this is actually different to my exact use but same conceptually
registerCallback(packets::onNext);
}
public Flowable<Packet> observeAllPacketsOn(Scheduler scheduler) {
return backpressuredPackets.observeOn(scheduler);
}
}
我不确定这是否符合我的要求。在onBackpressureLatest
javadoc上有一条关于observeOn
的注释,我不明白:
请注意,由于背压请求是如何通过subscribeOn/observeOn传播的,因此从下游请求超过1个并不保证onNext事件的连续传递
我还有其他问题:
onBackpressureLatest
调用是否使项目不再进行多播
我不确定这是否符合我的要求。
事实并非如此。分别应用onBackpressureLatest
或onBackpressureBuffer
和observealpacketson
中的observeOn
。
OnBackPressureRelatest调用是否使项目不再进行多播?
多播是由PublishProcencer
完成的,不同的订阅者将独立建立一个通道,其中onBackpressureXXX
和观察
运算符在单个订阅者的基础上生效。
如何测试我的需求?
使用TestSubscriber
(Flowable.test()
)通过有损或无损Flowable
订阅,将一组已知的数据包馈送到数据包中,并查看它们是否都通过
TestSubscriber到达。assertValueCount()
或TestSubscriber。values()
。有损耗的应该是1。。N,在一个宽限期之后,无损的应该有N个值。
奖励:如果我有多个这样的发布者(在同一个类或其他地方),那么让同一模式可重用的最佳方法是什么。使用委派/额外方法创建我自己的可流动性?
您可以将
观察AllPacketsOn
转换为FlowableTransov
,而不是MyBroadcaster上的方法调用,而是使用comush
,例如:
class MyTransformers {
public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
return f -> f.onBackpressureLatest().observeOn(s);
}
}
new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);
我正试图编写一个应用程序,将与Kafka集成使用骆驼。(版本-3.4.2) 我从这个问题的答案中借用了一种方法。 我有一条路线可以监听Kafka主题的信息。通过使用一个简单的执行器,该消息的处理与消耗是分离的。每个处理都作为任务提交给该执行者。消息的顺序并不重要,唯一需要考虑的因素是消息处理的速度和效率。我已禁用自动提交,并在任务提交给执行者后手动提交消息。丢失当前正在处理的消息(由于崩溃/关闭)
我已经将我的nodejs应用程序配置为使用MongoDB。我可以成功地连接和添加数据到我的mongodb实例。我的应用程序配置如下(敏感信息编辑): 如果我点击endpoint,那么pizza文档就会很好地在数据库中创建,但我注意到连接从未关闭;这意味着endpoint从不将“all good”字符串作为响应发送。而且,对的任何后续请求都会抛出以下错误: 我似乎没有正确处理promise和asyn
我有一些大致看起来像这样的代码: 是一个冷可观察的,它在标准的同时循环中产生(但是在另一个线程上!)。在未配置时,获取一些数据并调用。 我面临的问题是太“慢”,它跟不上生成数据的速度。这是一个异步操作,本质上是将对内存中某个位置的客户机的调用排队(无法访问此代码),并最终填充堆。 我需要找到一种方法,当这些调用开始堆积时,让停止生成。有没有一种方法可以使用内置操作符来实现这一点?
我有一份flink的工作,从Kafka读取数据,执行某些聚合,并将结果写入elasticsearch索引。我看到震源上有很高的背压。高背压导致数据从Kafka缓慢读取,我看到数据在网络堆栈中排队(netstat RecvQ显示源Kafka连接中有上万字节的数据,数据最终被读取),这反过来会导致数据在延迟后沉入elasticsearch,并且延迟持续增加。 源每分钟产生约17500条记录,Flink
问题内容: 假设我有某种游戏。我有一个buyItem函数,如下所示: 如果我对该路由进行垃圾邮件处理,直到扣除用户余额(第二次查询),则用户余额仍为正。 我尝试过的 问题是将在第一〜5项要求。因此,这也不起作用。 我们如何处理这种情况?如果重要的话,我正在使用Sails.JS框架。 问题答案: 通过该方法,Sails 1.0现在具有完整的事务支持。例: 更新资料 正如一些评论者所指出的,启用连接池
我做错了什么?我如何将结果与可观察的结果结合起来?最好不是用Lamda符号。 我查看的其他资源:-android rxjava2/retrofit2链接调用与分页令牌