当前位置: 首页 > 知识库问答 >
问题:

更改akka streams的源数据

米俊喆
2023-03-14

我正在学习JavaAkka流,并使用https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html定义了以下内容:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

public class SourceExample {

    static ActorSystem system = ActorSystem.create("SourceExample");

    public static void main(String args[]) throws ExecutionException, InterruptedException {

        final List<Integer> sourceData = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        final Source<Integer, NotUsed> source =
                Source.from(sourceData);
        final Sink<Integer, CompletionStage<Integer>> sink =
                Sink.<Integer, Integer>fold(0, (agg, next) -> agg + next);

        final CompletionStage<Integer> sum = source.runWith(sink, system);

        System.out.println(sum.toCompletableFuture().get());
    }

}

运行此代码的行为符合预期。

Akka Streams正在解决这个代码可以重复执行的问题。

在现实场景中,sourceData不会是静态的,Akka Streams对如何处理不断变化的数据有意见吗,还是由开发人员决定?

在最简单的情况下,只需在源数据更改时每X分钟重新执行一次流流(例如使用计划的任务)。还是Akka流是长期存在的,源数据更改并且流计算是根据某些参数重新执行的?

Akka Streams留档定义了多个数据源,但我不明白如何使用Akka Streams来处理不断变化的源数据。

共有1个答案

苏边浩
2023-03-14

Akka Streams可以而且经常运行到(在不久之前)你的应用停止。例如,流消费(例如,使用来自Alpakka Kafka的Kafka消费源)Kafka记录通常在应用程序中很早就开始,直到应用程序被杀死才停止。

为了详细说明,流一直运行到:

  • 阶段信号完成(例如,在您的示例中,Source.from将在发出10后发出完成信号)
  • 阶段失败(通常引发异常)

对动态数据有用的示例源代码是source(不引入Alpakka或Akka-HTTP)。队列,具体化为一个队列,队列中的元素可供流使用。

 类似资料:
  • 问题内容: 我有一个Spring应用程序,我想动态更改数据源。当输入DS URL时,Spring Bean和所有依赖项将自动更新。我知道这有些奇怪,但是无论如何我都想实现。我的Spring配置如下: 问题是: JDBC URL存储在属性中,可以在运行时更改它。 更改URL后,我需要重新创建数据源,可能还需要重新创建相关对象。我不知道Spring如何优雅地做呢? 我知道Spring确实可以基于一个键

  • 我有一个Spring应用程序,我想动态更改数据源,即。当输入DS URL时,Spring bean和所有依赖项将自动更新。我知道这有点奇怪,但无论如何我想实现这一点。我的Spring配置如下: 问题是: > 一旦URL被更改,我需要重新创建数据源,可能还有依赖对象。我不知道如何在Spring优雅地做这件事? 我知道Spring可以基于一个键动态路由数据源,但数据源URL是在Spring中预定义的,

  • 我最近开始使用Azure CosmosDB和函数。在阅读文档 https://docs.microsoft.com/pl-pl/azure/cosmos-db/change-feed-processor 时,我发现了一些对我来说很难理解的东西。是否真的可以在许多函数之间共享更改源,以便它们将由一个相同的数据库操作触发?什么是租约收集,它解决了什么问题。租赁的目的是什么?我想对这些术语进行基本解释。

  • 问题内容: 我们需要将某些列的数据类型从int更改为bigint。不幸的是,其中一些表很大,大约有7-10百万行(但不宽)。 Alter表alter列将永远保留在这些表上。有没有更快的方法来实现这一目标? 问题答案: 巧合的是,大约3个小时前,我不得不做一些非常相似的事情。该表是3500万行,它相当宽,并且花了很多时间才能做到这一点: 这就是我最终得到的结果: 这次,这些陈述几乎是即时的。(在速度

  • 每次我们刚开机时,电脑的右上角上都会浮现出能源之星的画面(如图1),只有少数主板和某些品牌机才会显示自己的品牌标志。我们能不能也像品牌机一样更新自己主板的BIOS,让它一开机就显示我们自己喜爱的标志而不用每天面对那千篇一律的能源之星呢?其实,要想实现上述想法,你只需三件有利工具就可以轻松搞定。这三样工具就是CBROM、BMP2EPA和AWDFLASH。下面我就逐步介绍修改过程。 1.主修改程序CB

  • 我想更改我的Android项目的设置,这样源代码就不会从[project_name]/app/src中获取,而是可以将其设置为从外部文件夹中获取。 项目结构原因是我想要共享我的src文件夹,所以我想在我的本地计算机上的Dropbox文件夹下定位。 有没有办法在Android Studio改变一个Android项目的SRC文件夹的路径?(在《月蚀》中,这是可能的) 以下是在Eclipse中如何完成的