当前位置: 首页 > 知识库问答 >
问题:

RxJava反压和对生产者的调用次数

竺勇
2023-03-14

我试图在我的Android应用程序中使用rx Java中的背压创建无限滚动。我希望它只调用请求的外部服务次数(在调用request(1)之后)。但在使用flatmap后,每个订阅都会加载16页。

在我的代码下面有预期的结果。几乎每个测试都因为第一次请求而失败(n=16)

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import rx.Observable;
import rx.observers.TestSubscriber;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
import static org.mockito.Mockito.*;
import static rx.internal.util.UtilityFunctions.identity;

public class ServiceObservablesTest {


    public static <T> Observable<List<T>> create(DataProvider<T> dataProvider) {
        Observable<Observable<List<T>>> metaObservalble = Observable.create(subscriber -> {
            AtomicInteger pageNumber = new AtomicInteger();
            subscriber.setProducer(n -> {
                // at subscribe rxJava makes request for 16 elements - probably because of flatMap
                // after first request with 16 elements everything seems to work fine even if i ignore the 'n' param

                Observable<List<T>> page = dataProvider.requestPage(pageNumber.getAndIncrement());
                subscriber.onNext(page);

            });
        });
        return metaObservalble.flatMap(identity()).takeWhile(page -> !page.isEmpty());
    }

    public interface DataProvider<T> {
        Observable<List<T>> requestPage(int page);
    }


    private DataProvider provider;

    @Before
    public void setUp() throws Exception {
        provider = Mockito.mock(DataProvider.class);
        List<Object> list = Arrays.asList(new Object());
        when(provider.requestPage(anyInt())).thenReturn(Observable.just(list));
    }

    @Test
    public void shouldRequestOnlyFirstPageOnSubscribe() {
        //given

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);

        //then
        subscriber.assertValueCount(1);
        subscriber.assertNotCompleted();

        verify(provider, times(1)).requestPage(0);
        verify(provider, never()).requestPage(1);
    }


    @Test
    public void shouldRequestNumberOfPagesSpecified() {
        //given

        int requested_pages = 5;
        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(0);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);
        subscriber.requestMore(requested_pages);

        //then
        subscriber.assertValueCount(requested_pages);
        subscriber.assertNotCompleted();


        for (int i = 0; i < requested_pages; i++) {
            verify(provider, times(1)).requestPage(i);
        }
        verify(provider, never()).requestPage(requested_pages);

    }


    @Test
    public void shouldCompleteAfterRetrievingEmptyResult() {
        //given

        int emptyPage = 2;
        when(provider.requestPage(emptyPage)).thenReturn(Observable.just(emptyList()));

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(100);
        Observable<List<Object>> flightsObservable = create(provider);


        //when
        flightsObservable.subscribe(subscriber);

        //then
        subscriber.assertValueCount(emptyPage);
        subscriber.assertCompleted();


        verify(provider, times(1)).requestPage(0); //requested at subscribe
        for (int i = 1; i <= emptyPage; i++) {
            verify(provider, times(1)).requestPage(i);
        }
        verify(provider, never()).requestPage(emptyPage + 1);

    }

    @Test
    public void shouldRequestNextPageWhenRequestedMore() {
        //given

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);
        subscriber.requestMore(1);

        //then
        subscriber.assertValueCount(2);
        verify(provider, times(1)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, never()).requestPage(2);

        //when
        subscriber.requestMore(1);

        //then
        subscriber.assertValueCount(3);
        subscriber.assertNotCompleted();

        verify(provider, times(1)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, times(1)).requestPage(2);
        verify(provider, never()).requestPage(3);

    }

    @Test
    public void shouldWorkWithMultipleSubscribers() {

        //given

        TestSubscriber<List<Object>> subscriber1 = new TestSubscriber<>(1);
        TestSubscriber<List<Object>> subscriber2 = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber1);
        flightsObservable.subscribe(subscriber2);

        //then
        subscriber1.assertValueCount(1);
        subscriber2.assertValueCount(1);

        verify(provider, times(2)).requestPage(0);
        verify(provider, never()).requestPage(1);

        //when
        subscriber1.requestMore(1);
        //then
        subscriber1.assertValueCount(2);
        subscriber2.assertValueCount(1);

        verify(provider, times(2)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, never()).requestPage(2);

        //when
        subscriber2.requestMore(1);
        //then
        subscriber1.assertValueCount(2);
        subscriber2.assertValueCount(2);

        verify(provider, times(2)).requestPage(0);
        verify(provider, times(2)).requestPage(1);
        verify(provider, never()).requestPage(2);
    }

}

共有1个答案

鲜于承基
2023-03-14

反压力旨在协商并发的消费者生产者行为,并允许程序作者设置策略来解决当产生的数据速率超过消耗的数据速率时该做什么。

这就是说,您将看到,组合可观察对象(如merge)的运算符将为您提供与所需数据量不一致的请求量。合并时,外部可观测对象(可观测对象的可观测对象)将始终在RxAndroid(RxJava中为128)上接收16的请求。然后,当它接收列表的内部可观察对象时,每个内部可观察对象将接收一个基于来自下游订户的请求量的请求。如果你试图写一个可观察的

我建议您将屏幕y位置映射为页面结束事件,然后使用扫描将其转换为单调递增的数字,然后将该数字映射为对数据提供者的调用。requestPage()

screenYPositions
    .map(this::isUninitializedOrNearEndOfPage)
    .scan(1, (event, pageNumber) -> pageNumber + 1 )
    .concatMap(dataProvider::requestPage)
    .subscribe(testSubscriber);

 类似资料:
  • 我读了一些RxJava中的背压文档,但我找不到详细的解释,比如它是如何在库中内部发生的,每个人都只是总结说“生产者”太快,“消费者”太慢。 例如,如下面的代码: 我已经看过了RxJava源代码,所以我的理解是,在主线程中,我们将每毫秒发出一次事件,一旦发出,我们就将值传递给系统。出来println(i)方法,并将其扔进newhead调度器的线程池,然后在可运行程序中运行该方法。 所以我的问题是,异

  • 我在producer端启用了snappy压缩,批量大小为64kb,处理每个1KB的消息,并将延迟时间设置为inf,这是否意味着在我处理64条消息之前,producer不会将消息发送给kafka out topic。。。Kafk64将发送一条消息,或每一个消息的制作人将发送另一个消息。。。 因为偏移量一个接一个地增加,而不是64的倍数 编辑-使用flink-kafka连接器

  • 我仍在尝试将RXJava2用于多个订阅者使用的轮询服务。它工作得很好,除了它总是调用两次。 我尝试使用publish(1)、take(1)、share()、refCount()等,但结果总是一样。叫两次或多次。 我还是不明白为什么它叫了两次。新订阅服务器应该接收最新发出的值,并且只在更改后的值(如果hashmap更改了)返回为modified List。 我的民意调查可观察到(单例) 我的谓词用于

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?