当前位置: 首页 > 知识库问答 >
问题:

对流的进出口使用单个参与者

卫振
2023-03-14

我开始探索akka-stream API,并希望对其进行测试。我有一个特定的用例,我似乎找不到合适的流式API来使用。

这个想法与以下内容相似,但有一点扭曲:

资料来源(1至10)。mapAsync(num)=

在这种情况下,ask将在未来返回一条消息。我想为发送给参与者的每个请求返回多条消息。例如:

  1. ActorRef被发送请求(2)
  2. ActorRef从流中发送N条消息。
  3. ActorRef被发送请求(3)
  4. ActorRef从流中发送M条消息。

我可以向Actor中添加额外的逻辑,以将这些额外的消息聚合/缓冲到某种类型的Iterable中,但我想知道API中是否缺少处理此类情况的内容。

在查看文档时,我最接近的方法是使用水槽。actorRefWithAck和Source。排队等候。从SinkandSourceMat,并在确认OnInit消息之前将具体化的SourceQueue传递给actorRef。这允许在通过流量处理一对多消息的同时控制上下游背压。让图表更容易理解似乎适得其反。

共有2个答案

邹坚壁
2023-03-14

不幸的是,你正在寻找的似乎并不存在。ask模式,例如 方法,只能返回1个值。因此,当你做类似

type Input = ???
type Outupt = ???

def askActorFlow(actorRef : ActorRef) : Flow[Input, Output, NotUsed] = 
  Flow[Input] mapAsync { input => (actorRef ? input).mapTo[Output] }

一次只能从ActorRef获取1个值。

您建议的工作,例如“在Actor中添加额外的逻辑以将这些额外的消息聚合/缓冲到Iterable中”,无论如何都是一个更好的解决方案。它使您的Actor用户更容易知道他们对任何查询只会得到一个响应。否则,用户将永远无法真正知道他们是否在查询后从Actor那里得到了“完整”的答案,或者是否会收到更多数据

戚锦
2023-03-14

我相信您正在寻找flatMapConcat

Source(1 to 10).flatMapConcat(num => 
  // Dummy example - outputs first 5 successors for each incoming element
  Source(num to (num + 5))
).to(Sink.foreach(println))

通过这种方式,你可以让Akka提升所有反应流的重量,并且不让任何一个赤裸裸的参与者污染你的管道。

这里有文档。

 类似资料:
  • 最近,我尝试为akka参与者编写一些单元测试,以测试参与者消息流。我在测试中观察到一些奇怪的行为: 下一个 在我的代码中,我有: 基本上,有时(很少)这样的测试失败(在另一个操作系统上),并且抛出processMessage方法的异常(由于业务逻辑导致的IllegalStateException)。

  • 如果函数接口有指针参数,既可以把指针所指向的数据传给函数使用(称为传入参数),也可以由函数填充指针所指的内存空间,传回给调用者使用(称为传出参数),例如strcpy的函数原型为 char *strcpy(char *dest, const char *src); 其中src参数是传入参数,dest参数是传出参数。有些函数的指针参数同时担当了这两种角色,如select函数。其函数原型为: int

  • 我有一个BigDecimals(在本例中是)的集合,希望将其添加在一起。有没有可能使用流来实现这一点? 我注意到类有几个方法 每一个都有一个方便的方法。但是,正如我们所知,和算术几乎总是一个坏主意。 那么,有没有方便的方法来求和BigDecimals呢? 这是我目前掌握的代码。 为后人编辑后答案: 这两个答案都非常有用。我想补充一点:我的实际场景不涉及原始的集合,它们包装在发票中。但是,我能够修改

  • 我目前有一个简单的TextMessage Source,它向我的Websocket客户端流发送消息,如下所示: 因此,我目前有一个Source类型的源代码[TextMessage.Strict,NotUsed],但我想使用注释掉的代码,其中我有一个ActorRef作为我的源代码。 我试过这个: 因此,当我使用ActorRef作为我的源时,我很难尝试将其放入图中。我得到了这个编译时错误: 类型不匹配

  • 问题内容: 我有以下查询: 并按预期得到以下结果: 有没有一种方法可以将上述结果复制到 另一个表中, 以便我的表看起来像这样? 我的问题是,可以预期任意数量的行,因此不确定如何遍历未知数量的行。 问题答案: 使用Eloquent或查询构建器在Laravel中进行批量插入确实很容易。 您可以使用以下方法。 在您的情况下,您已经在变量中包含了数据。

  • 问题内容: 我是使用AngularJS的新手。我有4个名为“取消”,“拒绝”,“成功”和“删除”的按钮。如果我单击任何按钮,我想显示多条消息,即,如果我单击“取消”,则显示您单击“取消”按钮的消息,依此类推。 我从下面的链接进行跟踪,但是在此示例中未使用简单的警报且未使用任何控制器。 我想要一个 在ANGULARJS指令 示例中的 模态弹出框, 这样我才能理解。我尝试用谷歌搜索它,但是没有找到简单