当前位置: 首页 > 知识库问答 >
问题:

使用反应器发布器避免GC压力

沈建柏
2023-03-14

在我们的代码中,回答服务间HTTP请求的典型句柄函数如下所示:

final List<Function<ChangeEvent, Mono<Void>>> triggerOtherMicroservices;

@PostMapping("/handle")
public Mono<Void> handle(@RequestBody ChangeEvent changeEvent) {
    return Mono
            .defer(() -> someService.callToAnotherMicroServiceToFetchData(changeEvent))
            .subscribeOn(Schedulers.parallel())
            .map(this::mapping)
            .flatMap(data -> databaseService.save(data))
            .thenMany(Flux.fromIterable(triggerOtherMicroservices).flatMap(t -> t.apply(changeEvent)))
            .then();
}

如果我没有理解错的话,这意味着每次调用handle时,都需要实例化整个管道(通常具有巨大的stacktraces)(并因此在以后收集)。

我的问题是:我能不能以某种方式“准备”一次整个流,以后再重用它?

进一步思考我可以做的是:

final List<Function<ChangeEvent, Mono<Void>>> triggerOtherMicroservices;

final Mono<Void> mono = Mono
        .defer(() -> Mono
                .subscriberContext()
                .map(context -> context.get("event"))
                .flatMap(event -> someService.callToAnotherMicroServiceToFetchData(event))
        )
        .subscribeOn(Schedulers.parallel())
        .flatMap(data -> databaseService.save(data))
        .thenMany(Mono
                .subscriberContext()
                .map(context -> context.get("event"))
                .flatMap(event -> Flux
                        .fromIterable(triggerOtherMicroservices)
                        .flatMap(t -> t.apply(event)))
        )
        .then(); 

public Mono<Void> handle(@Validated ChangeEvent changeEvent) throws NoSuchElementException {
    return mono.subscriberContext(context -> context.put("event", changeEvent));
}

无论如何,我怀疑这不是subscribercontext的用意。

共有1个答案

龚凯泽
2023-03-14

注意:有很多JVM实现,这个答案并不声称已经测试了所有的实现,也不是所有可能情况的一般说明。

根据html" target="_blank">https://www.bettercodebytes.com/the-cost-of-object-create-in-java-including-garbas-collection/,当对象只存在于一个方法中时,有可能没有对象创建的开销。这是因为JIT实际上并不实例化对象,而是直接执行包含的方法。因此,以后也不需要进行垃圾收集。

结合问题进行的测试可以这样实现:

final List<Function<Event, Mono<Void>>> triggerOtherMicroservices = Arrays.asList(
        event -> Mono.empty(),
        event -> Mono.empty(),
        event -> Mono.empty()
);

final Mono<Void> mono = Mono
        .defer(() -> Mono
                .subscriberContext()
                .<Event>map(context -> context.get("event"))
                .flatMap(this::fetch)
        )
        .subscribeOn(Schedulers.parallel())
        .flatMap(this::duplicate)
        .flatMap(this::duplicate)
        .flatMap(this::duplicate)
        .flatMap(this::duplicate)
        .thenMany(Mono
                .subscriberContext()
                .<Event>map(context -> context.get("event"))
                .flatMapMany(event -> Flux
                        .fromIterable(triggerOtherMicroservices)
                        .flatMap(t -> t.apply(event))
                )
        )
        .then();

@PostMapping("/event-prepared")
public Mono<Void> handle(@RequestBody @Validated Event event) throws NoSuchElementException {
    return mono.subscriberContext(context -> context.put("event", event));
}

@PostMapping("/event-on-the-fly")
public Mono<Void> handleOld(@RequestBody @Validated Event event) throws NoSuchElementException {
    return Mono
            .defer(() -> fetch(event))
            .subscribeOn(Schedulers.parallel())
            .flatMap(this::duplicate)
            .flatMap(this::duplicate)
            .flatMap(this::duplicate)
            .flatMap(this::duplicate)
            .thenMany(Flux.fromIterable(triggerOtherMicroservices).flatMap(t -> t.apply(event)))
            .then();
}


private Mono<Data> fetch(Event event) {
    return Mono.just(new Data(event.timestamp));
}

private Mono<Data> duplicate(Data data) {
    return Mono.just(new Data(data.a * 2));
}

数据:

long a;

public Data(long a) {
    this.a = a;
}

@Override
public String toString() {
    return "Data{" +
            "a=" + a +
            '}';
}

事件:

 @JsonSerialize(using = EventSerializer.class)
 public class Event {
     UUID source;
     long timestamp;

     @JsonCreator
     public Event(@JsonProperty("source") UUID source, @JsonProperty("timestamp") long timestamp) {
         this.source = source;
         this.timestamp = timestamp;
     }

     @Override
     public String toString() {
         return "Event{" +
                 "source=" + source +
                 ", timestamp=" + timestamp +
                 '}';
     }
 }

EventSerializer:

 public class EventSerializer extends StdSerializer<Event> {

     public EventSerializer() {
         this(null);
     }

     public EventSerializer(Class<Event> t) {
         super(t);
     }

     @Override
     public void serialize(Event value, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException {
         jsonGenerator.writeStartObject();
         jsonGenerator.writeStringField("source", value.source.toString());
         jsonGenerator.writeNumberField("timestamp", value.timestamp);
         jsonGenerator.writeEndObject();
     }
 }
 @SpringBootTest
 @AutoConfigureWebTestClient
 class MonoAssemblyTimeTest {

     @Autowired
     private WebTestClient webTestClient;

     final int number_of_requests = 500000;

     @Test
     void measureExecutionTime() throws IOException {
         measureExecutionTime("on-the-fly");
         measureExecutionTime("prepared");
     }

     private void measureExecutionTime(String testCase) throws IOException {
         warmUp("/event-" + testCase);

         final GCStatisticsDifferential gcStatistics = new GCStatisticsDifferential();
         long[] duration = benchmark("/event-" + testCase);

         StringBuilder output = new StringBuilder();
         int plotPointsInterval = (int) Math.ceil((float) number_of_requests / 1000);

         for (int i = 0; i < number_of_requests; i++) {
             if (i % plotPointsInterval == 0) {
                 output.append(String.format("%d , %d %n", i, duration[i]));
             }
         }

         Files.writeString(Paths.get(testCase + ".txt"), output.toString());

         long totalDuration = LongStream.of(duration).sum();
         System.out.println(testCase + " duration: " + totalDuration / 1000000 + " ms.");
         System.out.println(testCase + " average: " + totalDuration / number_of_requests + " ns.");
         System.out.println(testCase + ": " + gcStatistics.get());
     }

     private void warmUp(String path) {
         UUID source = UUID.randomUUID();
         IntStream.range(0, number_of_requests).forEach(i -> call(new Event(source, i), path));
         System.out.println("done with warm-up for path: " + path);
     }

     private long[] benchmark(String path) {
         long[] duration = new long[number_of_requests];

         UUID source = UUID.randomUUID();
         IntStream.range(0, number_of_requests).forEach(i -> {
             long start = System.nanoTime();
             call(new Event(source, i), path).returnResult().getResponseBody();
             duration[i] = System.nanoTime() - start;
         });
         System.out.println("done with benchmark for path: " + path);
         return duration;
     }

     private WebTestClient.BodySpec<Void, ?> call(Event event, String path) {
         return webTestClient
                 .post()
                 .uri(path)
                 .contentType(MediaType.APPLICATION_JSON)
                 .bodyValue(event)
                 .exchange()
                 .expectBody(Void.class);
     }

     private static class GCStatisticsDifferential extends GCStatistics {

         GCStatistics old = new GCStatistics(0, 0);

         public GCStatisticsDifferential() {
             super(0, 0);
             calculateIncrementalGCStats();
         }

         public GCStatistics get() {
             calculateIncrementalGCStats();
             return this;
         }

         private void calculateIncrementalGCStats() {
             long timeNew = 0;
             long countNew = 0;

             for (GarbageCollectorMXBean gc : ManagementFactory.getGarbageCollectorMXBeans()) {

                 long count = gc.getCollectionCount();

                 if (count >= 0) {
                     countNew += count;
                 }

                 long time = gc.getCollectionTime();

                 if (time >= 0) {
                     timeNew += time;
                 }
             }

             time = timeNew - old.time;
             count = countNew - old.count;

             old = new GCStatistics(timeNew, countNew);
         }

     }

     private static class GCStatistics {
         long count, time;

         public GCStatistics(long count, long time) {
             this.count = count;
             this.time = time;
         }

         @Override
         public String toString() {
             return "GCStatistics{" +
                     "count=" + count +
                     ", time=" + time +
                     '}';
         }
     }

 }

完成路径:/event-on-the-fly的预热

使用路径:/event-on-the-fly的基准测试完成

飞行时长:42679毫秒。

准备时间:44678毫秒。

准备平均值:89357纳秒。

编制:GCStatistics{count=86,time=67}

 类似资料:
  • 我不知道如何使用 React 正确实现发布者/订阅者方案。我有一个有效的解决方案,但对我来说,实现似乎不正确: 我的问题是,我需要手动实现发布者来注册订阅者并传递事件: 然后,我有一个WorkQueue处理器(应该是消费者): 它工作得很好,但很难看。在这个取自Spring Guides的示例中,他们使用EventBus将事件从发布者路由到消费者,但当我尝试将其与处理器链接时,我得到了以下编译器错

  • 问题内容: 建议在HTML页面中使用表格(现在已经有了CSS)? 表格有什么用途?表具有哪些CSS所没有的功能? 问题答案: 一点都不。但是将表格用于表格数据。只是不要将它们用于一般布局。 但是,如果您显示表格数据(例如结果或什至是表格),请继续使用表格!

  • 问题内容: 在我的应用程序中,我正在通过PMD运行代码,它向我显示以下消息: 避免使用printStackTrace(); 请改用记录器调用。 那是什么意思? 问题答案: 这意味着您应该使用logback或log4j之类的日志记录框架,而不是直接打印异常: 您应该使用以下框架的API记录它们: 日志记录框架为您提供了很大的灵活性,例如,您可以选择是否要登录到控制台或文件-如果发现它们在某些环境中不

  • 问题内容: 我读到应该避免赞成和。我对弄乱Loop并没有信心,也没有完全理解Codex。 下面的代码是否使用?如果是,并且由于应该避免,那么您能建议一种不使用但仍然完成相同任务的方法吗? 此代码用于按随机或按价格对帖子进行排序。 。 使用此代码将链接A(随机)和链接B(价格)发布在我的菜单中。因此,网站的访问者只需单击链接即可对帖子进行排序。 问题答案: 我已经针对WPSE这个主题做了非常详细的解

  • 我不确定如何处理这个问题,http响应状态取决于我需要阅读的主体。我有这样的想法: 但为了获得阅读正文所需的状态,我看不到任何使用发布者提供的值的选项。我如何使其能够调用上面的方法,并在创建NetYoutBound时使用该状态

  • 问题内容: 我的代码有效,但是我有一个最佳实践问题:状态中有一组对象,并且用户交互一次将更改一个对象的值。 据我所知,我不应该直接更改状态,而是应该始终使用。如果我想不惜一切代价避免这种情况,我将通过迭代深度克隆数组,然后更改克隆。然后将状态设置为克隆。我认为避免改变以后会改变的状态只是降低了我的表现。 详细版本: this.state.data是对象数组。它代表论坛中的主题列表,并且收藏夹按钮将