当前位置: 首页 > 知识库问答 >
问题:

RxJava阀门用例

艾跃
2023-03-14

RxJava中是否有一个操作员、一个外部库或一种我不知道的方法来创建一个可流动/可观察的,接收控制数据排放的函数,比如阀门?

我有一个巨大的json文件需要处理,但我必须得到文件的一部分,一个实体列表,处理它,然后得到另一部分,我尝试使用windows(),buffer(),但我传递给Flowable的双函数。generate()在收到第一个列表但尚未完成处理后继续执行。我也试过流动变形金刚。我是胡。阿卡诺克。rxjava3。但它只是在处理列表的flatMap()函数之前堆积项目

private Flowable<T> flowable(InputStream inputStream) {

    return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {

        final var token = jsonParser.nextToken();

        if (token == null) {
            emitter.onComplete();
        }

        if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
            return jsonParser;
        }

        if (JsonToken.START_OBJECT.equals(token)) {
            emitter.onNext(reader.readValue(jsonParser));
        }

        return jsonParser;
    }, JsonParser::close);
}

编辑:我需要控制项目的释放,以避免内存和处理数据的函数过载,因为该函数读取和写入数据库,处理也需要按顺序进行。处理数据的函数不完全是我的,它是用RxJava编写的,我希望使用Rx。

我成功地解决了这个问题,但如果有其他方法,请告诉我:

public static <T> Flowable<T> flowable(InputStream inputStream, JsonFactory jsonFactory, ObjectReader reader, Supplier<Boolean> booleanSupplier) {
    return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {

        if (booleanSupplier.get()) {
            final var token = jsonParser.nextToken();

            if (token == null) {
                emitter.onComplete();
            }

            if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
                return jsonParser;
            }

            if (JsonToken.START_OBJECT.equals(token)) {
                emitter.onNext(reader.readValue(jsonParser));
            }

        }
        
        return jsonParser;
    }, JsonParser::close);
}

Edit2:这是我目前使用函数的方式之一

public Flowable<List<T>> paging(Function<List<T>, Single<List<T>>> function) {
    final var atomicInteger = new AtomicInteger(0);
    final var atomicBoolean = new AtomicBoolean(true);

    return flowable(inputStream, jsonFactory, reader, atomicBoolean::get)
            .buffer(pageSize)
            .flatMapSingle(list -> {

                final var counter = atomicInteger.addAndGet(1);

                if (counter == numberOfPages) {
                    atomicBoolean.set(false);
                }

                return function.apply(list)
                        .doFinally(() -> {
                            if (atomicInteger.get() == numberOfPages) {
                                atomicInteger.set(0);
                                atomicBoolean.set(true);
                            }
                        });
            });
}

共有1个答案

宗波涛
2023-03-14

就这样解决了

 public static Flowable<Object> flowable(JsonParser jsonParser, ObjectReader reader, PublishProcessor<Boolean> valve) {
    return Flowable.defer(() -> {
        final var token = jsonParser.nextToken();

        if (token == null) {

            return Completable.fromAction(jsonParser::close)
                    .doOnError(Throwable::printStackTrace)
                    .onErrorComplete()
                    .andThen(Flowable.empty());
        }


        if (JsonToken.START_OBJECT.equals(token)) {
            final var value = reader.readValue(jsonParser);
            final var just = Flowable.just(value).compose(FlowableTransformers.valve(valve, true));
            return Flowable.concat(just, flowable(jsonParser, reader, valve));
        }


        return flowable(jsonParser, reader, valve);
    });
}
 类似资料:
  • 当上面的”阀门“打开时,输出下面的数值 用法 Your browser does not support the video tag. 案例:数字标签 功能:显示通过的数字。 工作原理 上面的输入接收为YES/NO;下面的输入接收为数值。如果上面的输入是YES,那么下面的输入将被发送到节点的输出;否则,节点将输出NO。

  • 我试图在RxJava中找到一个能够以特定方式进行节流的操作符: 发射元素 我似乎找不到一个符合这种行为的。我看了一些类似的,但似乎没有一个是正确的。 > /-发出元素序列中的最后一个元素,这些元素之间的间隔很短 查看设置的时间间隔并在每个时间间隔中发出最后一个元素的示例。 throttleFirst查看设置的时间间隔并在每个时间间隔中发出第一个元素。这似乎是最接近我想要的,但并不完全一样。 是否有

  • 我想实现一些类似大门机制的东西。我需要一个PublishSubject和几个订阅者。当PublishSubject通过onNext发送数据时,只有一个订阅服务器将接收数据。 例如:我有3个等于片断在标签。他们订阅了全局发布的OnLoginPublisher。当ONREME或onPause时,CALL gate变为打开或关闭。当onLogin被调用并且由于屏幕上没有这些片段而没有打开任何门时,onN

  • RxJava是 ReactiveX 在JVM上的一个实现,ReactiveX使用Observable序列组合异步和基于事件的程序。更多关于ReactiveX的资料,可以查看 ReactiveX 介绍 页面。 RxJava介绍 RxJava 是轻量级的 RxJava尽力做到非常轻巧。它仅关注Observable的抽象和与之相关的高层函数,实现为一个单独的JAR文件。 RxJava 是一个多语言实现

  • 概述 φ4直通式节流阀是一种直通式流量控制阀,用于流量的调节和控制,手动旋钮控制调节,可完全锁定关闭。 尺寸图纸

  • 你可以在这里找到JVM平台几种语言的例子 language adaptor: RxGroovy 示例 RxClojure 示例 RxScala 示例 下面的示例从一个字符串列表创建一个Observable,然后使用一个方法订阅这个Observable。 Java public static void hello(String... names) { Observable.from(name