当前位置: 首页 > 知识库问答 >
问题:

阿卡流-如何从多个SQS源流

齐志勇
2023-03-14

这是 Akka 流 - 根据流中的元素选择接收器的后续帖子。

假设我有多个SQS队列,我想从这些队列中进行流式传输。我使用Alpakka的AWS SQS连接器来创建< code >源。

implicit val sqsClient: AmazonSQSAsync = ???
val queueUrls: List[String] = ???
val sources: List[Source[Message, NotUsed]] = queueUrls.map(url => SqsSource(url))

现在,我想将合并源代码以合并它们。但是,Source.combine方法不支持将列表作为参数传递,而只支持变量。

def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed])

当然,我可以用手指输入所有的源参数。但是,如果我有10个源队列,这个参数会变得很长。

有没有一种方法可以将源列表中的源组合起来?

【补充】

正如拉蒙·J·罗梅罗·维吉尔(Ramon J Romero y Vigil)所指出的,保持“薄贴面”是更好的做法。然而,在这种特殊情况下,我使用单个sqsClient来初始化所有SqsSource

共有1个答案

谯志诚
2023-03-14

您可以使用foldLeft连接或合并源:

val sources: List[Source[Message, NotUsed]] = ???

val concatenated: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ ++ _)
// the same as sources.foldLeft(Source.empty[Message])(_ concat _)

val merged: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ merge _)

或者,您可以将Source.zipNflatMapConcat一起使用:

val combined: Source[Message, NotUsed] = Source.zipN(sources).flatMapConcat(Source.apply)
 类似资料:
  • 如果有一个源应用程序(名为a-source),它有多个通道来发送消息,例如channelA。destination=b主题,channelB。destination=c主题。b-topic的接收者是b-link,c-topic的接受者是c-Sink。我如何构造我的流,像这样描述它们:A|B和A|C?如果是这样的话,我认为我的A源代码的一部分在每个流中都是有用的。 所以我的问题是:SCDF流DSL如

  • 如何为每个作业设置流程参数?我试图配置一个自定义警报程序,我希望每个作业都触发它。它看起来像是在从流参数中寻找“alert.type”属性,但现在我只能通过接口触发它。有什么想法吗?

  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 假设我有几个,我会将它们放入另一个列表或其他集合中,所以在调用之前,我不知道我有多少个 应该是:

  • 问题内容: 我想确定显示点集合所需的最小面积。最简单的方法是像这样遍历整个集合: 我开始了解流。为此,您可以执行以下操作: 两者给出相同的结果。但是,尽管流方法很优雅,但速度却慢得多(如预期的那样)。 有没有办法让,,并在一个单一的流操作? 问题答案: 与类推,创建一个收集所需信息的类。它定义了两种方法:一种用于记录a的值,一种用于组合两个的值。 然后,您可以将收集到中。 更新 我完全对 OP得出

  • 在Akka Streams的早期版本中,返回了一个的,可以具体化为。 在Akka Streams 2.4中,我看到返回一个——我不清楚如何使用它。我需要应用于流的转换必须使整个可用,所以我不能只

  • 我有三个SQS FIFO队列,其中每个队列在EC2实例中都有一个数据投影侦听器守护进程,作为docker pod(SQL Server、PostgreSQL、Elastic Search等) 所有队列的设置如下(死信队列稍后设置)。 这都是我使用DynamoDB流设计的事件源架构的一部分= 启用以避免队列中的重复消息,因为Lambda路由器中的任何队列总是可能出现错误。 现在,我还将每条消息的设置

  • 问题内容: 我在SQS中有多个消息。即使有数十个可见代码(不在运行中),以下代码也 始终 仅返回一个。 我认为setMaxNumberOfMessages允许一次使用多个..我误解了吗? 我也尝试使用withMaxNumberOfMessages,但没有任何运气: 我怎么知道队列中有消息?大于1? 上面的代码总是在运行之前给我> 1 感谢您的输入 问题答案: AWS API参考指南:Query /