这是 Akka 流 - 根据流中的元素选择接收器的后续帖子。
假设我有多个SQS队列,我想从这些队列中进行流式传输。我使用Alpakka的AWS SQS连接器来创建< code >源。
implicit val sqsClient: AmazonSQSAsync = ???
val queueUrls: List[String] = ???
val sources: List[Source[Message, NotUsed]] = queueUrls.map(url => SqsSource(url))
现在,我想将合并
源代码以合并它们。但是,Source.combine方法不支持将列表作为参数传递,而只支持变量。
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed])
当然,我可以用手指输入所有的源参数。但是,如果我有10个源队列,这个参数会变得很长。
有没有一种方法可以将源列表中的源组合起来?
【补充】
正如拉蒙·J·罗梅罗·维吉尔(Ramon J Romero y Vigil)所指出的,保持“薄贴面”是更好的做法。然而,在这种特殊情况下,我使用单个sqsClient
来初始化所有SqsSource
。
您可以使用foldLeft
连接或合并源:
val sources: List[Source[Message, NotUsed]] = ???
val concatenated: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ ++ _)
// the same as sources.foldLeft(Source.empty[Message])(_ concat _)
val merged: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ merge _)
或者,您可以将Source.zipN
与flatMapConcat
一起使用:
val combined: Source[Message, NotUsed] = Source.zipN(sources).flatMapConcat(Source.apply)
如果有一个源应用程序(名为a-source),它有多个通道来发送消息,例如channelA。destination=b主题,channelB。destination=c主题。b-topic的接收者是b-link,c-topic的接受者是c-Sink。我如何构造我的流,像这样描述它们:A|B和A|C?如果是这样的话,我认为我的A源代码的一部分在每个流中都是有用的。 所以我的问题是:SCDF流DSL如
如何为每个作业设置流程参数?我试图配置一个自定义警报程序,我希望每个作业都触发它。它看起来像是在从流参数中寻找“alert.type”属性,但现在我只能通过接口触发它。有什么想法吗?
当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。
假设我有几个,我会将它们放入另一个列表或其他集合中,所以在调用之前,我不知道我有多少个 应该是:
问题内容: 我想确定显示点集合所需的最小面积。最简单的方法是像这样遍历整个集合: 我开始了解流。为此,您可以执行以下操作: 两者给出相同的结果。但是,尽管流方法很优雅,但速度却慢得多(如预期的那样)。 有没有办法让,,并在一个单一的流操作? 问题答案: 与类推,创建一个收集所需信息的类。它定义了两种方法:一种用于记录a的值,一种用于组合两个的值。 然后,您可以将收集到中。 更新 我完全对 OP得出
在Akka Streams的早期版本中,返回了一个的,可以具体化为。 在Akka Streams 2.4中,我看到返回一个——我不清楚如何使用它。我需要应用于流的转换必须使整个可用,所以我不能只
我有三个SQS FIFO队列,其中每个队列在EC2实例中都有一个数据投影侦听器守护进程,作为docker pod(SQL Server、PostgreSQL、Elastic Search等) 所有队列的设置如下(死信队列稍后设置)。 这都是我使用DynamoDB流设计的事件源架构的一部分= 启用以避免队列中的重复消息,因为Lambda路由器中的任何队列总是可能出现错误。 现在,我还将每条消息的设置
问题内容: 我在SQS中有多个消息。即使有数十个可见代码(不在运行中),以下代码也 始终 仅返回一个。 我认为setMaxNumberOfMessages允许一次使用多个..我误解了吗? 我也尝试使用withMaxNumberOfMessages,但没有任何运气: 我怎么知道队列中有消息?大于1? 上面的代码总是在运行之前给我> 1 感谢您的输入 问题答案: AWS API参考指南:Query /