RxJava中是否有一个操作员、一个外部库或一种我不知道的方法来创建一个可流动/可观察的,接收控制数据排放的函数,比如阀门?
我有一个巨大的json文件需要处理,但我必须得到文件的一部分,一个实体列表,处理它,然后得到另一部分,我尝试使用windows(),buffer(),但我传递给Flowable的双函数。generate()在收到第一个列表但尚未完成处理后继续执行。我也试过流动变形金刚。我是胡。阿卡诺克。rxjava3。但它只是在处理列表的flatMap()函数之前堆积项目
private Flowable<T> flowable(InputStream inputStream) {
return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {
final var token = jsonParser.nextToken();
if (token == null) {
emitter.onComplete();
}
if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
return jsonParser;
}
if (JsonToken.START_OBJECT.equals(token)) {
emitter.onNext(reader.readValue(jsonParser));
}
return jsonParser;
}, JsonParser::close);
}
编辑:我需要控制项目的释放,以避免内存和处理数据的函数过载,因为该函数读取和写入数据库,处理也需要按顺序进行。处理数据的函数不完全是我的,它是用RxJava编写的,我希望使用Rx。
我成功地解决了这个问题,但如果有其他方法,请告诉我:
public static <T> Flowable<T> flowable(InputStream inputStream, JsonFactory jsonFactory, ObjectReader reader, Supplier<Boolean> booleanSupplier) {
return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {
if (booleanSupplier.get()) {
final var token = jsonParser.nextToken();
if (token == null) {
emitter.onComplete();
}
if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
return jsonParser;
}
if (JsonToken.START_OBJECT.equals(token)) {
emitter.onNext(reader.readValue(jsonParser));
}
}
return jsonParser;
}, JsonParser::close);
}
Edit2:这是我目前使用函数的方式之一
public Flowable<List<T>> paging(Function<List<T>, Single<List<T>>> function) {
final var atomicInteger = new AtomicInteger(0);
final var atomicBoolean = new AtomicBoolean(true);
return flowable(inputStream, jsonFactory, reader, atomicBoolean::get)
.buffer(pageSize)
.flatMapSingle(list -> {
final var counter = atomicInteger.addAndGet(1);
if (counter == numberOfPages) {
atomicBoolean.set(false);
}
return function.apply(list)
.doFinally(() -> {
if (atomicInteger.get() == numberOfPages) {
atomicInteger.set(0);
atomicBoolean.set(true);
}
});
});
}
就这样解决了
public static Flowable<Object> flowable(JsonParser jsonParser, ObjectReader reader, PublishProcessor<Boolean> valve) {
return Flowable.defer(() -> {
final var token = jsonParser.nextToken();
if (token == null) {
return Completable.fromAction(jsonParser::close)
.doOnError(Throwable::printStackTrace)
.onErrorComplete()
.andThen(Flowable.empty());
}
if (JsonToken.START_OBJECT.equals(token)) {
final var value = reader.readValue(jsonParser);
final var just = Flowable.just(value).compose(FlowableTransformers.valve(valve, true));
return Flowable.concat(just, flowable(jsonParser, reader, valve));
}
return flowable(jsonParser, reader, valve);
});
}
当上面的”阀门“打开时,输出下面的数值 用法 Your browser does not support the video tag. 案例:数字标签 功能:显示通过的数字。 工作原理 上面的输入接收为YES/NO;下面的输入接收为数值。如果上面的输入是YES,那么下面的输入将被发送到节点的输出;否则,节点将输出NO。
我试图在RxJava中找到一个能够以特定方式进行节流的操作符: 发射元素 我似乎找不到一个符合这种行为的。我看了一些类似的,但似乎没有一个是正确的。 > /-发出元素序列中的最后一个元素,这些元素之间的间隔很短 查看设置的时间间隔并在每个时间间隔中发出最后一个元素的示例。 throttleFirst查看设置的时间间隔并在每个时间间隔中发出第一个元素。这似乎是最接近我想要的,但并不完全一样。 是否有
我想实现一些类似大门机制的东西。我需要一个PublishSubject和几个订阅者。当PublishSubject通过onNext发送数据时,只有一个订阅服务器将接收数据。 例如:我有3个等于片断在标签。他们订阅了全局发布的OnLoginPublisher。当ONREME或onPause时,CALL gate变为打开或关闭。当onLogin被调用并且由于屏幕上没有这些片段而没有打开任何门时,onN
RxJava是 ReactiveX 在JVM上的一个实现,ReactiveX使用Observable序列组合异步和基于事件的程序。更多关于ReactiveX的资料,可以查看 ReactiveX 介绍 页面。 RxJava介绍 RxJava 是轻量级的 RxJava尽力做到非常轻巧。它仅关注Observable的抽象和与之相关的高层函数,实现为一个单独的JAR文件。 RxJava 是一个多语言实现
概述 φ4直通式节流阀是一种直通式流量控制阀,用于流量的调节和控制,手动旋钮控制调节,可完全锁定关闭。 尺寸图纸
你可以在这里找到JVM平台几种语言的例子 language adaptor: RxGroovy 示例 RxClojure 示例 RxScala 示例 下面的示例从一个字符串列表创建一个Observable,然后使用一个方法订阅这个Observable。 Java public static void hello(String... names) { Observable.from(name