当前位置: 首页 > 知识库问答 >
问题:

使用akka流时的事件顺序

富建章
2023-03-14

阅读akka-stream的留档,我不太清楚消息的顺序以及是否可以强制执行。让我用我为聊天服务器编写的一小段代码来设置我的问题的上下文。

def flowShape(user: User) = GraphDSL
  .create(Source.actorRef[ChatMessage](bufferSize = 5, OverflowStrategy.fail)) {
    implicit builder =>
      implicit chatSource =>

      import GraphDSL.Implicits._

      val messageFromOutside = builder.add(Flow[String].map {
        case msg: String => UserTextMessage(user, msg)
        case _ => InvalidMessage
      })

      val merge = builder.add(Merge[ChatMessage](2))
      // UPDATE --> this is where the change comes
      // val merge = builder.add(Concat[ChatMessage](2))

      // val channelActorSink = Sink.actorRefWithAck(channelActor, ActorInitMessage, AckMessage, UserLeft(user))
      val channelActorSink = Sink.actorRef(channelActor, UserLeft(user))

      val actorAsSource = builder.materializedValue.map { actor => UserJoined(user, actor) }

      actorAsSource ~> merge.in(0)
      messageFromOutside.out ~> merge.in(1)
      merge ~> channelActorSink

      FlowShape(messageFromOutside.in, chatSource.out)
}

为了让事情变得简单,我使用了这个流的形状和一个非常简单的源和汇。像这样的--

val source = Source(List[String]("hi", "hello", "what are you upto", "this is nice"))
val sink = Sink.foreach[ChatMessage] {
  case tm: UserTextMessage => println(s"${tm.user.username} :: ${tm.content}")
  case ul: UserLeft => println(s"${ul.user.username} just left the channel")
  case uj: UserJoined => println(s"${uj.user.username} just joined the channel")
  case _ => println(s"do not know what I just received")
}

val mychatchannel = new Channel(420, myactorsystem)

source.via(mychatchannel.chatFlow(User("sushruta"))).runWith(sink)

现在,我的担忧来了。终端中打印的事件顺序根本不正常。我不知道该怎么解决。这是我得到的结果--

[INFO] [11/10/2017 17:42:20.431] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/10/2017 17:42:20.441] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] received a user joined message
[INFO] [11/10/2017 17:42:20.443] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/10/2017 17:42:20.444] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message

输出中缺少第一条消息hihi消息似乎是在UserJoin消息打印之前发送的。

我尝试通过使用actorRefWithAck(我在上面的代码中对此进行了注释)来修复它(并在消息传递方面增加了一些安全性)它给出了类似的输出。

[INFO] [11/11/2017 06:33:03.731] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] channel initialized and ready to take events
[INFO] [11/11/2017 06:33:03.735] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.736] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a user joined message
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-4] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a UserLeft message

显然,似乎正在发生的事情是源代码在发送UserJoin消息之前发送消息。我该怎么解决这个问题?从概念上讲,我想我希望UserJoin消息在源代码具体化之后,但在它实际发送第一条消息之前就被发送。可能吗?

谢谢

共有1个答案

闾丘玮
2023-03-14

把溪流想象成水管:有水就会流。合并运算符不关心元素来自哪一侧。如果你想订购这些输入,你需要用Concat告诉Akka。

 类似资料:
  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 我想利用一个简单的流从http服务收集一些额外的数据,并用这些结果来增强我的数据对象。下面说明了这一想法: 我有一个问题,要理解流的本质和流内部的物化/未来之间的机制和区别。 以下想法并没有向我解释: null

  • 关于如何解决此错误的建议,以便我可以使用最新版本的akka、akka streams和akka HTTP?谢了!

  • 我刚刚开始使用C#WPF表单,遇到了一个小问题,我找不到解决方法。 我有一个按钮,目前只是检查一个文件夹是否存在。 正确的流程是; 写入文本框以表示进程已启动。 执行该过程。 写入文本框以表示进程已完成(是否成功) 实际发生的情况是,当按下按钮时,所有操作都完成了,然后写入textbox。 有没有一种方法让文本框在每个过程结束时填充--计划是在一个按钮上执行多个操作,最终按下并记录每一个操作--最

  • TL;DR:目前保证Flink中事件时间顺序的最佳解决方案是什么? 我使用Flink 1.8.0和Kafka 2.2.1。我需要通过事件时间戳保证事件的正确顺序。我每隔1秒生成周期性水印。我使用Flink Kafka消费者与AscendingTimestampExtractor: 然后处理: 我意识到,对于在同一毫秒或几毫秒之后发生的无序事件,Flink不会纠正顺序。我在文档中发现: 水印触发所有

  • 我有一个练习要解决。我有一个Fox类,它有名称和颜色字段。我的练习是根据颜色找出狐狸的频率。 因此,我创建了一个HashMap,其中String属性将是fox名称,整数将是事件本身: 这样做之后,我一直在尝试用流编写代码,但我很难做到这一点。我写了这样的东西: ,狐狸是一个列表。 我的问题基本上是语法。我想做一件事,如果颜色没有出现,那么 其他的 我应该如何把它放在一起?