当前位置: 首页 > 知识库问答 >
问题:

创建一个Flowable,以有限的速率发出项目,以避免缓冲事件的需要

洪光霁
2023-03-14

我有一个数据访问对象,它将数据源中的每个项目传递给消费者:

public interface Dao<T> {
    void forEachItem(Consumer<T> item);
}

这总是以单线程方式生成项目——我目前无法更改此接口

我想从这个界面创建一个可流动的:

private static Flowable<String> flowable(final Dao dao) {
    return Flowable.create(emitter -> {
        dao.forEachItem(item ->
                emitter.onNext(item));
        emitter.onComplete();
    }, ERROR);
}

如果我使用这个Flowable的情况下,处理需要的时间比项目发出的速度长,那么我可以理解得到一个丢失的背压异常,因为我使用ERROR模式:

    Dao<String> exampleDao =
            itemConsumer ->
                    IntStream.range(0, 1_000).forEach(i ->
                            itemConsumer.accept(String.valueOf(i)));

    flowable(exampleDao)
            .map(v -> {
                Thread.sleep(100);
                return "id:" + v;
            })
            .blockingSubscribe(System.out::println);

如果操作比生产者慢得多,我不希望缓冲项目——看起来这可能会导致在非常大的数据集上耗尽内存。

我希望会有一个背压模式,允许发射器在检测到背压时阻止下一个/完成事件,但似乎不是这样?

在我的例子中,正如我所知,dao以单线程的方式生成项目,我认为我可以做如下事情:

  dao.forEachItem(item -> {
    while (emitter.requested() == 0) {
      waitABit();
    }         
    emitter.onNext(item)
  });

但这似乎永远不会改变。

我的方法有多错误?:-)考虑到我(相对限制性)的一系列情况,有没有一种方式能够以一种尊重下游背压的方式生产产品?

我知道我可以用一个单独的进程写入队列,然后根据从该队列中的消耗编写一个Flowable-这是首选的方法吗?

共有1个答案

柴良哲
2023-03-14

检查Flowable的部分,尤其是带有Supscription.request(long)的部分。我希望这能让你走上正确的道路。

本例中的TestProducer生成给定范围内的Integer对象,并将它们推送到其订户。它扩展了的可流动性

对于传递给订阅者的订阅而言,重要的是可以从该onNext()调用中递归调用订阅者上调用onNext()request()方法。为了防止堆栈溢出,所示的实现使用outStandingRequests计数器和isProducing标志。

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

本例中的使用者扩展了DefaultSubscriber

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

在测试类的以下主要方法中,生产者和消费者被创建并连接起来:

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

运行该示例时,日志文件显示消费者连续运行,而生产者仅在需要重新填充rxjava2的内部流动缓冲区时才处于活动状态。

 类似资料:
  • 我正在尝试更新此代码以处理Discord。jsv12。我遇到了一个错误,我有点困惑于如何修复这个错误。我已经更新了一些代码来处理Discord。jsv12。 client.channels.fetch... 在客户端。(/home/runner/Log/index.js:44:40)在客户端。在客户端发出(events.js:314:20)。事件发射器。在MessageCreateAction处发

  • 我正在尝试正确地使用ByteBuffer和BigEndian字节顺序格式。。 我有几个字段,我试图把它存储在Cassandra数据库之前放在一个单一的ByteBuffer中。 我将要写入Cassandra的字节数组由三个字节数组组成,如下所述- 现在,我需要快速压缩attributeValue数据,然后再将其存储在Cassandra中- 现在,我将编写,和snappy压缩的一起组成一个单字节数组,

  • 如果你已经使用过Android Studio和Gradle,那么这一章会比较简单。我不会给出很多细节和截图,因为用户界面和细节可能会一直变化。 我们的应用是由一个简单的天气app组成,正如所使用的Google’s Beginners Course in Udacity。我们可能会关注不同的事情,但是app的想法都是一样的,你会发现在一个典型的app里面会包括很多不同的东西。如果你的Android开

  • 我想创建一个简单的Java类文件,并从我的所有项目访问它。 我知道我可以通过复制/粘贴方法来完成,但在这种情况下,类文件将独立于其原始源。我在原始类文件中所做的任何更改都不会影响已经复制到其他项目中的类文件。 在Android Studio中有链接类文件的方法吗?

  • 垃圾回收期在管理内存方面非常出色,它非常高效地移除不再使用的对象。但是无论你如何看待它,分配和销毁一个基于堆内存的对象花费处理器时间比分配和销毁不是基于堆内存的对象要多。在函数内创建大量的引用类型对象会引入严重的性能消耗问题。 所以不能让垃圾回收器超负荷工作。你可以借鉴一些简单的技巧最小化垃圾回收器的工作。所有的引用类型对象,即使是局部变量,都被分配存储在堆内存上。每个引用类型的局部变量在函数结束

  • 首先,打开Android Studio并选择Create new Project,然后它会让你输入一个名字,你可以任意取一个名字,比如:Weather App。然后你需要输入公司域名。如果你不会真正发布这个app,这个字段就不是特别重要了,但是如果你有的话可以使用自己的域名。然后任意选择一个目录作为这个项目的保存地址。 下一步,它会让你选择最小的API版本。我们选择API 15,因为我们有一个库需