当前位置: 首页 > 知识库问答 >
问题:

使用Akka流分区时,我已经“连接”了

冯野
2023-03-14

我正在尝试Akka流API,但我不知道为什么它会抛出java。lang.IllegalArgumentException。

    val graph = RunnableGraph.fromGraph(
      GraphDSL.create(source, sink)
      ((source, sink) => Seq(source, sink)) {
        implicit b => (source, sink) =>
          Import akka.stream.scaladsl.GraphDSL.Implicits._
          val partition = b.add(Partition[(KinesisRecord)](2, flow => {
            1
          }))

          source ~> partition.in

          partition.out(0) ~> sink
          partition.out(1) ~> sink

          ClosedShape
      })

这是当前的代码。错误如下

[info] - should consume *** FAILED ***
[info] java.lang.IllegalArgumentException: [Map.in] is already connected
[info] at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1567)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase.$tilde$greater(Graph.scala:1730)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase.$tilde$greater$(Graph.scala:1729)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1784)
[info] at akka.stream.scaladsl.GraphApply.create(GraphApply.scala:46)
[info] at akka.stream.scaladsl.GraphApply.create$(GraphApply.scala:41)
[info] at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:1529)

我使用KinesRecord作为源代码的目标。但是,在这段代码中,如果我将outputPorts更改为1并删除

partition.out(1) ~> sink

这条线,它起作用了。

我不知道我是错过了什么,还是只是一个错误。

共有1个答案

阮喜
2023-03-14

我在我的环境中复制了您的错误并使用解决方案实现。但我使用的是Int源代码,而不是Kinesis源代码。您可以将其替换为您的数据类型,它可能会工作。

import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.scaladsl.{GraphDSL, Merge, Partition, RunnableGraph, Sink, Source}

import scala.concurrent.duration._

object AkkaStreamWithKinesis extends App {
  implicit val system = ActorSystem("AkkaStreamWithKinesisSystem")

  val source = Source(1 to 1000).throttle(5, 1 second)
  val sink = Sink.foreach[Int](println(_))

  val graph = RunnableGraph.fromGraph(
    GraphDSL.create(source, sink)
    ((source, sink) => Seq(source, sink)) {
      implicit builder =>
        (source, sink) =>
          import akka.stream.scaladsl.GraphDSL.Implicits._
          val partition = builder.add(Partition[Int](2, flow => {
            1
          }))
          val merge = builder.add(Merge[Int](2))

          source ~> partition.in
          partition.out(0) ~> merge.in(0)
          partition.out(1) ~> merge.in(1)
          merge.out ~> sink

          ClosedShape
    }).run()
}

输出:

09:15:36.443 [AkkaStreamWithKinesisSystem-akka.actor.default-dispatcher-6] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
1
2
3
4
5
6
7
8
9
10
11
12
 类似资料:
  • 我正在尝试Akka Stream API,我不知道为什么这会抛出java.lang.IllegalArgumentException:[Partition.in]已经在第5行连接 但如果我将validationPartitioner改为用builder包装。添加(…)然后移除。从 一切正常。如果我只是删除. in代码不编译。为什么强制使用构建器,我是否遗漏了什么或者是错误?

  • 我有一个akka流,我有一个ADT的形式。 现在我有一个这个消息处理程序流和一个那个消息处理程序流。我有一个接受消息类型的入口流。 为了创建拆分,我有以下分区器。我有以下分区器函数的定义。 我希望使用上述方法,并使用类型params中的ADT来初始化分区程序。 编译器抛出这个错误。 据我所知,分区对象只有Inlet(在本例中为A,参数化类型。 有人知道我该怎么解决这个问题吗?

  • 我的问题是,我有一个未知的组数,如果mapAsync的并行数少于我得到的组数,并且在最后一个接收器中出错 由于上游错误(Akka.Stream.Impl.StreamSubscriptionTimeoutSupport$$Anon$2),正在拆除SynchronousFileSink(/users/sam/dev/projects/akka-streams/target/log-error.txt

  • 走这条路对吗?如果没有,流量控制该怎么做?

  • 我想利用一个简单的流从http服务收集一些额外的数据,并用这些结果来增强我的数据对象。下面说明了这一想法: 我有一个问题,要理解流的本质和流内部的物化/未来之间的机制和区别。 以下想法并没有向我解释: null

  • 阅读akka-stream的留档,我不太清楚消息的顺序以及是否可以强制执行。让我用我为聊天服务器编写的一小段代码来设置我的问题的上下文。 为了让事情变得简单,我使用了这个流的形状和一个非常简单的源和汇。像这样的-- 现在,我的担忧来了。终端中打印的事件顺序根本不正常。我不知道该怎么解决。这是我得到的结果-- 输出中缺少第一条消息。消息似乎是在打印之前发送的。 我尝试通过使用(我在上面的代码中对此进