当前位置: 首页 > 知识库问答 >
问题:

如何访问Akka流的计算结果?

袁宜民
2023-03-14

我正在尝试返回一个流操作的结果,在本例中为:

  1. 求和列表
  2. 取值平方
  3. 取值平方

其表示为:

        .fold(0, (aggr, next) -> aggr + next)
        .map(x -> x * x)
        .map(x -> x * x)

若要访问我使用的值,请执行以下操作

final AtomicInteger returnValue = new AtomicInteger();

紧随其后的是:

        .to(Sink.foreach(x -> {
            returnValue.set(x);
            System.out.println("got: " + x);
        }))

这需要一个阻塞调用来允许流完成,这是不可接受的:

Thread.sleep(2000);

如果我使用:

    CompletableFuture<Object> futureValue =
            ask(actorRef, Done.done(), Duration.ofMillis(5000)).toCompletableFuture();
    System.out.println(futureValue.toCompletableFuture().get().toString());

返回错误:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://StreamsExamples/system/Materializers/StreamSupervisor-0/$$a-actorRefSource#1663100910]] after [5000 ms]. Message of type [akka.Done$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.

在本例中,收件人执行元是源,并在done.done消息中返回以下内容:

return Optional.of(CompletionStrategy.immediately());

Akka流可以用来返回流中的计算值吗?唯一的替代方法是将计算值存储在DB中,或者当在以下位置计算值时将其发送到Kafka主题:

.to(Sink.foreach(x -> {

完成SRC:

import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public class GetStreamValue {

    final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");

    public static void main(String args[]) throws InterruptedException, ExecutionException {


        int bufferSize = 100;
        final Source<Integer, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) {
                                return Optional.of(CompletionStrategy.immediately());
                            }
                            else {
                                return Optional.empty();
                            }
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());

        final AtomicInteger returnValue = new AtomicInteger();

        final ActorRef actorRef = source
                .fold(0, (aggr, next) -> aggr + next)
                .map(x -> x * x)
                .map(x -> x * x)
                .to(Sink.foreach(x -> {
                    returnValue.set(x);
                    System.out.println("got: " + x);
                }))
                .run(system);

        Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
        Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
        actorRef.tell(Done.done(), ActorRef.noSender());

        Thread.sleep(2000);

        System.out.println("returnValue is "+returnValue);

    }
}

共有1个答案

秦承安
2023-03-14

我想你可能缺少的是理解Akka Streams中物化价值的概念。浏览文档的这一部分,特别是关于组合物化值的部分。我也尝试在这里解释这个概念(物化价值搜索)。如果你追求物化的价值,那么也许我在这里写的东西会更有意义。

调用source.actorref(..)返回source ,其中T是流经流的元素的数据类型(在您的示例中是integer),而actorref是该source的物化值。当对runnablegraph调用run时,您可以同步获得物化值,这是to(...)call返回的值。

actorref是如何按照source.actorref(...)语义“驱动”流的。

现在的问题是如何获得通过流的数据。在您的示例中,将所有整数缩减为一个,因此可以使用sink.head,而不是使用sink.foreach(...)(这对副作用有好处)。您看到,sink还可以生成物化值,对于sink.head来说,它物化为流中第一个元素的completionstage,在您的情况下,该元素是唯一的元素。让我们试试:

final ActorRef actorRef = source
                                .fold(0, (aggr, next) -> aggr + next)
                                .map(x -> x * x)
                                .map(x -> x * x)
                                .to(Sink.head())
                                .run(system);

好吧,那没有太大帮助。您仍然只能获得的物化值。要获得接收器的物化值,我们需要显式地请求它:

final Pair<ActorRef, CompletionStage<Integer>> matVals =
      source
        .fold(0, (aggr, next) -> aggr + next)
        .map(x -> x * x)
        .map(x -> x * x)
        .toMat(Sink.head(), Keep.both())
        .run(system);

现在,我们得到了宿的物化值。您可以像前面一样通过actorref驱动流:

final ActorRef actorRef = matVals.first();

Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());

您还可以使用completablestageAPI从流中获取值。像这样说:

Integer folded = matVals.second().toCompletableFuture().join(); 

是的,这是阻塞,但您需要在流运行到完成之前以某种方式阻止主线程完成。

 类似资料:
  • 我正在学习与Akka溪流一起工作,并且真的很喜欢它,但是物化部分对我来说仍然有些神秘。 引用自http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/client-side/host-level.html#host-level-api ...通过对池客户端流具体化到的HostConnectionPo

  • 使用指南 - 数据报告 - 流量分析 - 访问时长的计算 访问时长指访客每次在网站访问所停留的时长,即从进入第一个页面到离开最后一个页面的时长。 在传统统计工具下,最后一个页面的关闭时间很难得到,百度统计在技术上进行了升级,能够获取到该页面的关闭时间。 然而用户行为具有多样性,当用户快速关闭浏览器、长时间未对页面进行操作或其它网络原因导致的时候,系统会无法获取到页面的关闭信息,从而使最后一个页面的

  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 准备数据: CREATE TABLE t1 (year YEAR(4), month INT(2) UNSIGNED ZEROFILL, day INT(2) UNSIGNED ZEROFILL); INSERT INTO t1 VALUES(2000,1,1),(2000,1,20),(2000,1,30),(2000,2,2), (2000,2

  • 我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部流式处理所有内容。 有什么想法吗?

  • 问题内容: 我正在为用户帖子构建一个访客计数系统,以显示首页上浏览量最高的帖子。我现在有一个访客计数系统,但是所有页面刷新后都会注册一个视图。我无法使用Google Analytics(分析)。 我需要的是一个访客计数器,该计数器只计算不重复的访客。就我而言,独特意味着一个人一天只能浏览一个帖子?我认为甚至一个星期都可以。你可以在这里写那个php代码吗?如果您也喜欢的话,也可以给我链接一些优秀的教