当前位置: 首页 > 知识库问答 >
问题:

RxJava等效于简单ThreadPoolExecutor示例

韩彦君
2023-03-14

我已经退出Java游戏大约8年了,从那以后发生了很多变化。对我来说最大的挑战是RxJava/反应式。我正在寻找关于如何以完全反应式的方式执行以下等效操作的粗略指导。

下面使用ThreadPoolExecutor实现的基本需求是通过调用远程web服务来处理大量的内容,该服务的记录速率限制为每分钟100个请求。我的目标是尽可能快地处理,不丢弃任何东西,但仍然遵守下游利率限制。该代码已被简化,以避免错误、舱壁、断路器、重试逻辑等。

这段代码目前运行良好,但考虑到所有非阻塞的被动选项,它会导致大量的线程浪费。甚至我用来调用我的服务的HTTP客户端也提供了一个可流动的,我只是在执行器的20个线程中的每个线程中阻塞它。

我很想了解反应当量应该是什么。我所遇到的困难是,我找到的showcase中几乎所有的文档都使用静态源作为可观察对象(例如:从数组(1,2,3,4,5))。我知道解决方案可能涉及IoScheduler,也可能涉及groupBy,但我还没有弄清楚如何将来自HTTP客户端的可流动的代码合并到一个完整的链中,该链可以进行并行化(最多20个)和速率限制。

public class Example {
    private static final int THREADS = 20;

    // using https://docs.micronaut.io/latest/guide/index.html#httpClient
    @Client("http://stuff-processor.internal:8080")
    @Inject
    RxHttpClient httpClient;

    private ThreadPoolExecutor executor;
    private final RateLimiter rateLimiter;

    public Example() {
        // up to 20 threads to process the unbounded queue
        // incoming Stuff is very bursty...
        // ...we could go hours without anything and then hundreds could come in
        this.executor = new ThreadPoolExecutor(THREADS, THREADS,
                30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        this.executor.allowCoreThreadTimeOut(true);

        // using https://resilience4j.readme.io/docs/ratelimiter
        RateLimiterConfig config = RateLimiterConfig.custom()
                .limitRefreshPeriod(Duration.ofSeconds(60))
                .limitForPeriod(100)
                .timeoutDuration(Duration.ofSeconds(90))
                .build();
        RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
        rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);
    }

    /**
     * Called when the user takes an action that can cause 1 or 1000s of new
     * Stuff to be entered into the system. Each instance of Stuff results in
     * a separate call to this method. Ex: 100 Stuffs = 100 calls.
     */
    void onNewStuff(Stuff stuff) {
        final Runnable task = () -> {
            final Flowable<HttpResponse<Boolean>> flowable = httpClient.exchange(
                    HttpRequest.POST("/process", stuff),
                    Boolean.class);

            final HttpResponse<Boolean> response = flowable.blockingFirst();
            if (response.body()) {
                System.out.println("Success!");
            } else {
                System.out.println("Fail :(");
            }
        };

        final Runnable rateLimitedTask = 
                RateLimiter.decorateRunnable(rateLimiter, task);
        executor.submit(rateLimitedTask);
    }
}

谢谢你!

共有1个答案

方鸿振
2023-03-14

首先,要以完全非阻塞的方式构建它,您需要使用像Netty这样的非阻塞、异步HTTP客户端库。我不确定RxHttpClient是如何工作的。

假设你有一个清单。我会这样做:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();

flatMap在响应出现时将其合并。

为了限制速率,您平面地图有第二个参数,它限制它并行订阅的内部流的数量。假设您想一次进行不超过10次调用。这样做:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();
 类似资料:
  • 问题内容: 我正在从xml配置转移到注释。我想转换一个会话范围的bean是 可以通过注释完成此操作吗?如果没有,我该怎么做才能使该声明继续工作? 问题答案: 在spring上下文xml中,执行以下操作: 请注意,尽管如此,你将需要为该包中的所有类编写接口。

  • 本文向大家介绍java  ThreadPoolExecutor使用方法简单介绍,包括了java  ThreadPoolExecutor使用方法简单介绍的使用技巧和注意事项,需要的朋友参考一下 java  ThreadPoolExecutor 前言: 在项目中如果使用发短信这个功能,一般会把发短信这个动作变成异步的,因为大部分情况下,短信到底是发送成功或者失败,都不能影响主流程。当然像发送MQ消息等

  • 我已经读了很多例子,博客文章,问题/答案关于//在Python 3.5中,许多是复杂的,我发现最简单的可能是这个。 仍然使用,为了学习Python中的异步编程,我想看一个更小的示例,以及执行基本异步/等待示例所需的最小工具是什么。 问题:是否可以给出一个简单的示例,说明/是如何工作的,只使用这两个关键字code运行异步循环—其他Python代码,而不使用其他函数? 例如:类似这样的东西: 但如果没

  • 问题内容: 该表示法是: 实际上不哈希对象;它实际上只是转换为字符串(通过它是一个对象,还是其他各种原始类型的内置转换),然后在“ ”中查找该字符串,而不对其进行哈希处理。也不会检查对象是否相等-如果两个不同的对象具有相同的字符串转换,则它们将彼此覆盖。 鉴于此-在JavaScript中是否有任何有效的hashmap实现?(例如,第二个Google结果产生的实现对任何操作都是O(n)。其他各种结果

  • 问题内容: 我正在尝试从Swift的iTu​​nesU中的“开发适用于iPhone和iPad的ios7应用程序”中复制斯坦福Matchismo游戏。 在第3讲幻灯片的第77页上,它显示了使用,这不是Swift上的选项。Swift文档示例显示了一个具有数组的示例,但是我不知道如何使Interface Builder将多个插座连接到同一个/ Array。 有人知道如何做到这一点吗? 我知道我可以创建1

  • 我怀疑这很容易,但我不能让它像我想的那样工作。我正在使用我的Firebase数据库的查询信息引用数据库ref。下面的代码工作得很好,但我不能在Match_01中硬编码(这纯粹是为了让代码工作)。 我需要做的是使用已传递到片段的 matchID 并使用 equalTo 而不是引用最终的子节点。 但这不起作用,我无法将最后一个子引用换成订单ByChild引用。 感谢所有的帮助。