当前位置: 首页 > 知识库问答 >
问题:

如何将alpakka kafka与akka stream websocket结合起来

乐刚毅
2023-03-14

我使用https://doc.akka.io/docs/alpakka-kafka/current/consumer.html从kafka使用数据,如下所示:

implicit val system: ActorSystem = ActorSystem("SAPEVENTBUS")
implicit val materializer: Materializer = ActorMaterializer()

val config = system.settings.config.getConfig("akka.kafka.consumer")
val consumerSettings =
  ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("SAP-BUS")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000")

val kafkaConsumer =  Consumer
  .plainSource(
    consumerSettings,
    Subscriptions.topics("SAPEVENTBUS"))
  .toMat(Sink.foreach(println))(Keep.both)
  .mapMaterializedValue(DrainingControl.apply)  

接下来,我将通过akka http websocket客户端将收到的结果转发到webserver

以下是如何构建websocket客户端:

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher

// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
  Sink.foreach {
    case message: TextMessage.Strict =>
      println(message.text)
  }

val helloSource: Source[Message, NotUsed] =
  Source.single(TextMessage("hello world!"))

// the Future[Done] is the materialized value of Sink.foreach
// and it is completed when the stream completes
val flow: Flow[Message, Message, Future[Done]] =
  Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)

// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] representing the stream completion from above
val (upgradeResponse, closed) =
  Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow)

val connected = upgradeResponse.map { upgrade =>
  // just like a regular http request we can access response status which is available via upgrade.response.status
  // status code 101 (Switching Protocols) indicates that server support WebSockets
  if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
    Done
  } else {
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  }
}

我有两个问题:

>

  • 如何将消费者和websocket客户端组合成一个流,并让它将消息发送到Web服务器。

    我想根据内容将从Web服务器接收到的消息广播到两个接收器中。

    如何构建这样的图?

  • 共有1个答案

    谢财
    2023-03-14

    如果您计划将所有Kafka消息推入web套接字,而不进行响应处理,则应该在输入和输出没有逻辑连接的真正双向场景中编写web套接字的消息处理程序:

    //Kafka reading logic
    val kafkaSource: Source[ConsumerRecord[String, String], Consumer.Control] = Consumer
        .plainSource(consumerSettings, Subscriptions.topics("SAPEVENTBUS"))
    //kafka message serialization logic
    val kafkaRecordToMessageTransform: Flow[ConsumerRecord[String, String], Message, NotUsed] =
        Flow[ConsumerRecord[String, String]].map[Message](consumerRecord => {
            TextMessage.Strict(s"${consumerRecord.key} - ${consumerRecord.value}")
        })
    
    //web socket's messages sending logic
    val webSocketWriteLogic: Source[Message, Consumer.Control] =
        kafkaSource.via(kafkaRecordToMessageTransform)
    
    //web socket's messages receiving logic
    val webSocketReadLogic: Sink[Message, NotUsed] = Flow[Message].mapAsync[String](1)({
        case textMessage: TextMessage =>
            textMessage.toStrict(collectTimeout).map(_.text)
        case binaryMessage: BinaryMessage =>
            binaryMessage.toStrict(collectTimeout).map(_.data.toString())
    }).to(Sink.foreach[String](messageText => println(s"received $messageText")))
    
    //web socket's logic
    val webSocketLogic: Flow[Message, Message, Consumer.Control] =
        Flow.fromSinkAndSourceMat(webSocketReadLogic, webSocketWriteLogic)(Keep.right)
    

    根据分区阶段的某些条件,您可以将流消息广播到多个接收器中。还有,你可以查看这个解释。

     类似资料:
    • 我想在React项目的后端合并一个使用语音识别的Python文件。 这里我有一段使用语音识别的Python代码: 我尝试在react组件中导入文件,如下所示: 在这里,我尝试将我的按钮链接到Python文件中的函数: 我收到了它未能编译的消息以及以下内容: 我能做些什么来使这个工作?

    • 为了能够轻松地测试我的SOAP客户机(通过利用MockWebServiceServer),我想使用Spring的WebServiceTemplate。 SOAPendpoint是.NET服务器。基于wsdl,我能够生成bean(主要是请求类)和endpoint接口。 生成的界面如下: SOAPFaultClientException:反序列化操作“CallCheckXML”的请求消息正文时出错。O

    • 问题内容: 我已经使用Python asyncio和aiohttp成功构建了一个RESTful微服务,该服务可侦听POST事件以收集来自各种供料器的实时事件。 然后,它构建一个内存结构,以将事件的最后24小时缓存在嵌套的defaultdict / deque结构中。 现在,我想定期检查该结构到磁盘的位置,最好使用pickle。 由于内存结构可以大于100MB,因此我希望避免在检查点结构所需的时间上

    • 问题内容: 如何创建使芹菜任务看起来像的包装器?还是有更好的方法与Celery集成? Celery的创建者@asksol这样说: 将Celery用作异步I / O框架之上的分布式层是很常见的(提示:将CPU绑定的任务路由到prefork worker意味着它们不会阻塞事件循环)。 但是我找不到任何专门针对框架的代码示例。 问题答案: 如官方网站上所述,这可以通过Celery 5.0版实现: htt

    • 我有一个(Spring开机/Spring云)应用程序(微服务'MS'架构)与Netflix工具构建,我想将其部署在kubernetes集群(一个主和2个小跟班),以从其编排事实中获得优势。 顺便说一下,我在集群上创建了一个库贝-dns服务,我还尝试用3个Pod挂载一个eureka服务(名为eurekaservice)。另一方面,我运行了一个带有下一个eureka配置的微服务: 好消息是集群上的每个