响应式编程--Reactor官方翻译

明宜年
2023-12-01

响应式编程

Wiki的定义

  Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).
  响应式编程是一种异步的编程范式,它主要关心数据流的处理和变更的传播。这意味着,通过后端的编程语言可以更轻松地处理静态(数组)或是动态(事件推送)的数据展示。

More
  响应式编程最早由微软在.NET生态中提出,创建了Reactive Expression。随后RxJava在JVM中实现了响应式编程。经过长期的发展,在Java中出现了一套标准,包括响应式库的接口和交互的规则,接口请参考Java 9中的Flow类。
  响应式编程,经常出现在面向对象的语言中,是观察者模式的一个变形。你可能会想到我们熟悉的迭代器模式,响应式的流处理上也有Iterable-Iterator对,但是迭代器基于拉模型,响应式基于推模型。
  迭代器是一种命令式的编程模式,尽管Iterator只有一个获取数据的方法。何时调用next()方法来获取数据取决于开发者。而响应式的数据流,对应的是Publisher-Subscriber模式,当数据可用时,由发布者通知订阅者。这个推送的过程是响应式的关键。推送的操作是声明式的,而不是命令式的。也就是说编程人员只需定义期望使用的逻辑,而不是告知具体的步骤。

阻塞式处理带来的问题

  尽管硬件的水平在不断提高,但是在巨大用户的请求面前,如何提高软件的效率也是一个关键。
  一般来说,有两种提高程序的效率的方式:

 并行处理,也意味着需要更多的硬件资源。
 保持现有的硬件资源不变,提高现有硬件资源的利用率。

  我们常用的阻塞式编程,很容易遇到性能瓶颈。当引入额外的线程,又会带来线程之间的竞争和并发的问题。更糟糕的是,阻塞造成资源的浪费(会有线程占用资源却不干事^_^)。
  因此并行的策略并不是做好的解决手段。

异步或许是一剂灵药

   我们寻找更有效的方式,是通过使用异步的、非阻塞的编码。我们先让当前的进程切换去执行活跃的任务,当这个异步的进程结束后,再切换回当前进程。
  Java提供2种异步编程的模式:

 Callback: 异步的方法并不返回具体的值,而是接受一个callback参数(一般是个匿名类),当结果可用时,调用这个callback,处理结果。
 Future: 异步的方法有返回值,会直接返回一个Future<T>对象,程序处理的结果的类型是T,封装在Future中,但是这个值暂时没有值,直到异步的方法处理完毕,才可获取到结果。

  但是两种方式均有缺陷。
考虑这样的场景,展示一个用户的5个喜好及喜好的描述,如果该用户没有喜好,展示5条他的建议。使用Callback的编码如下:

userService.getFavorites(userId, new Callback<List<String>>() { 
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

  但是使用Reactor的编码如下:

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) 
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);

  Reactor不仅减少代码量,也提供了超时的控制。

 类似资料: