当前位置: 首页 > 知识库问答 >
问题:

RxJS节流特性;立即获取第一个值

欧阳狐若
2023-03-14

示例PRUNKR:https://plnkr.co/edit/NZwb3ol8CbZFtSc6Q9zm?p=preview

我知道RxJS(5.0 beta.4)有以下3种节流方法:

auditTime()throttleTime()debounceTime()

我正在寻找的行为是默认情况下在throttle上执行的行为:

理论上,这应该看起来像:

inputObservable
  .do(() => cancelPreviousRequest())
  .throttleTime(500)
  .subscribe((value) => doNextRequest(value))

但是

  • throttleTime如果在节流超时时发出,则从不给我最后一个值

我可以结合任何RxJS方法来实现所描述的行为吗?

共有3个答案

柯耀
2023-03-14

我采取了AuditTime操作符,并更改了2行以实现所需的行为。

新普朗克:https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview

原件:

  • https://github.com/ReactiveX/rxjs/blob/master/src/operator/auditTime.ts

变化:

从(AuditTime):

protected _next(value: T): void {
  this.value = value;
  this.hasValue = true;
  if (!this.throttled) {
    this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
  }
}

clearThrottle(): void {
  const { value, hasValue, throttled } = this;
  if (throttled) {
    this.remove(throttled);
    this.throttled = null;
    throttled.unsubscribe();
  }
  if (hasValue) {
    this.value = null;
    this.hasValue = false;
    this.destination.next(value);
  }
}

到(auditTimeImmediate):

protected _next(value: T): void {
    this.value = value;
    this.hasValue = true;
    if (!this.throttled) {
        // change 1:
        this.clearThrottle();
    }
}

clearThrottle(): void {
    const { value, hasValue, throttled } = this;
    if (throttled) {
        this.remove(throttled);
        this.throttled = null;
        throttled.unsubscribe();
    }
    if (hasValue) {
        this.value = null;
        this.hasValue = false;
        this.destination.next(value);
        // change 2:
        this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
    }
}

所以我开始超时后的值是下一个ed。

用法:

inputObservable
  .do(() => cancelPreviousRequest())
  .auditTimeImmediate(500)
  .subscribe((value) => doNextRequest(value))
盖和泰
2023-03-14

对于较旧的RxJs,我编写了一个concatLatest运算符,它可以完成您想要的大部分操作。有了它,你可以得到你的节流行为与以下代码:

const delay = Rx.Observable.empty().delay(500);
inputObservable
    .map(value => Rx.Observable.of(value).concat(delay))
    .concatLatest()
    .subscribe(...);

接线员来了。我尝试将其更新为使用RxJS5:

Rx.Observable.prototype.concatLatest = function () {
    /// <summary>
    /// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed.
    /// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown
    /// away and only the Nth will be observed.
    /// </summary>
    /// <returns type="Rx.Observable"></returns>
    var source = this;

    return Rx.Observable.create(function (observer) {
        var latest,
            isStopped,
            isBusy,
            outerSubscription,
            innerSubscription,
            subscriptions = new Rx.Subscription(function () {
              if (outerSubscription) {
                outerSubscription.unsubscribe();
              }
              if (innerSubscription) {
                innerSubscription.unsubscribe();
              }
            }),
            onError = observer.error.bind(observer),
            onNext = observer.next.bind(observer),
            innerOnComplete = function () {
                var inner = latest;
                if (inner) {
                    latest = undefined;
                    if (innerSubscription) {
                      innerSubscription.unsubscribe();
                    }
                    innerSubscription = inner.subscribe(onNext, onError, innerOnComplete);
                }
                else {
                    isBusy = false;
                    if (isStopped) {
                        observer.complete();
                    }
                }
            };

        outerSubscription = source.subscribe(function (newInner) {
            if (isBusy) {
                latest = newInner;
            }
            else {
                isBusy = true;
                if (innerSubscription) {
                  innerSubscription.unsubscribe();
                }
                innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete);
            }
        }, onError, function () {
            isStopped = true;
            if (!isBusy) {
                observer.complete();
            }
        });

        return subscriptions;
    });
};

以下是更新的plunkr:https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview

注:我已将您的lodash版本更新为最新版本。在lodash 4.7中,我重写了throttle/debounce操作符来修复一些边缘案例错误。你用的是4.6。1仍然有一些bug,尽管我不认为它们影响了你的测试。

穆文斌
2023-03-14

对于在2018年之后寻找这一点的人:这是一年前添加的,但由于某些原因,文档尚未更新。

RxJS提交

您只需将配置对象传递给throttleTime。默认值为{leading:true,training:false}。要实现这里讨论的行为,您只需将trailing设置为true{leading:true,trailing:true}

编辑:

为了完整起见,这里有一个工作片段:

import { asyncScheduler } from 'rxjs'
import { throttleTime } from 'rxjs/operators'

...

observable.pipe(
  throttleTime(100, asyncScheduler, { leading: true, trailing: true })
)
 类似资料:
  • 问题内容: 有没有一种方法来获取JSON对象的第一个属性的名称? 我想做这样的事情: 编辑: 我得到一个JSON对象,其中包含具有图像URL的数组类别。 像这样: 然后,我遍历该对象以插入图像,而我真正想要的只是一种优雅的方式来查看首先插入哪个类别。一开始我只是做了 但这有点丑陋…所以基本上这只是一个优雅的问题:p 问题答案: 不能 保证 对象属性的顺序与您放置它们的方式相同。但是,实际上,所有主

  • 我想知道一张linkedhashmap的名字,但只知道最后一张。 这是我的hashmap,我只打印出姓氏。我如何得到第一个?有没有办法把它链接到密钥上。例如,“gumt”打印出Gumtäktsuni.bibliotek? 输出: 我也有一个生效的节点类。节点类:

  • 我想在JAVA程序中执行一个EXE文件。 它工作正常,但我希望EXE的输出立即在我的JAVA程序的文本区域中。 目前,我在“ping”命令完全完成后得到输出(因此JAVA程序挂起了大约3秒)。但是我想马上有结果... 我做错了什么? 突击队向后。 好吧,我想使用这个程序:https://iperf.fr/iperf-download.php 输出如下所示: 不过,我只有在iperf运行后才能得到这

  • 问题内容: 我使用Hibernate 5.2.5(如果重要的话,也可以使用kotlin和spring 4.3.5),并且我希望延迟加载类的某些字段。但是问题是所有字段都立即加载,我没有任何特殊的Hibernate设置,也没有使用Hibernate.initialize()。 这就是我的查询方式 TaskRepoImpl: TaskService: 并输出: 请告知我的代码出了什么问题以及如何使Hi

  • 下面是HashMap中包含的值 获取第一个键(即活动键)的Java代码 我们如何收集第一个键“值”(即33),我想把“键”和“值”都存储在单独的变量中。

  • 问题内容: 我正在尝试获取价值TTFB和连接价值 它会像 但是,我只需要golang变量中value的值。 另外,有什么方法可以在不专门使用curl的情况下获取值? 问题答案: 自Go 1.7起就有对此的内置支持。Go 1.7添加了HTTP跟踪,请阅读博客文章:HTTP跟踪简介 您可以指定在发出HTTP(S)请求时在适当的阶段/点调用的回调函数。您可以通过创建值来指定回调函数,然后使用来“武装”它