我们有以下架构
SQS(源)->SQS轮询器->我们的业务逻辑->Sink,它从SQS中删除消息。
这是一个akka流(我们的业务逻辑有多个阶段)。
现在我们希望通过添加HTTP服务器(而不是Akka HTTP)来扩展这个体系结构。
现在我们的服务也有了路径
我认为https://doc.akka.io/docs/akka/2.5/stream/operators/source/queue.html是这里的一个潜在解决方案,但它只返回一个句柄,以便在整个可运行图物化之后调用,因此合并SQS轮询器源和HTTP可调用源有点难看。
我认为source.queue
确实是这里的方法,将流具体化一次,并从HTTP服务器endpoint和SQS轮询器向队列提供元素。有什么特别的原因会让人觉得这很难看吗?
当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。
在Akka Streams的早期版本中,返回了一个的,可以具体化为。 在Akka Streams 2.4中,我看到返回一个——我不清楚如何使用它。我需要应用于流的转换必须使整个可用,所以我不能只
我需要遍历一个形状像树的API。例如,目录结构或讨论线程。它可以通过以下流程进行建模: 如何遍历这些数据?我的工作如下: 然而,由于我使用的是带有缓冲区的流,所以流永远不会完成。 上游完成且缓冲元件已排空时完成 流缓冲器 我多次阅读了图表周期、活跃度和死锁部分,但仍在努力寻找答案。 这将创建一个活动锁: 编辑:我添加了一个git repo来测试你的解决方案https://github.com/Ma
走这条路对吗?如果没有,流量控制该怎么做?
问题内容: 我想在我的应用程序中创建一个流程。但是,从Java的API环顾四周之后,我还是不太了解它。 基本上,我想创建一个多进程应用程序。但是新过程是我的应用程序中的一类。 我知道有些人可能会问为什么不创建线程?因为该类正在调用Matlab代码,所以问题出在这里,而Java类在这里 有什么办法吗? 问题答案: 只有一种方法可以用Java创建进程,基本上,它允许您像通过命令行界面一样启动新的JVM
我的问题是,我有一个未知的组数,如果mapAsync的并行数少于我得到的组数,并且在最后一个接收器中出错 由于上游错误(Akka.Stream.Impl.StreamSubscriptionTimeoutSupport$$Anon$2),正在拆除SynchronousFileSink(/users/sam/dev/projects/akka-streams/target/log-error.txt