我使用https://doc.akka.io/docs/alpakka-kafka/current/consumer.html从kafka使用数据,如下所示:
implicit val system: ActorSystem = ActorSystem("SAPEVENTBUS")
implicit val materializer: Materializer = ActorMaterializer()
val config = system.settings.config.getConfig("akka.kafka.consumer")
val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("SAP-BUS")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000")
val kafkaConsumer = Consumer
.plainSource(
consumerSettings,
Subscriptions.topics("SAPEVENTBUS"))
.toMat(Sink.foreach(println))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
接下来,我将通过akka http websocket客户端将收到的结果转发到webserver
以下是如何构建websocket客户端:
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
}
val helloSource: Source[Message, NotUsed] =
Source.single(TextMessage("hello world!"))
// the Future[Done] is the materialized value of Sink.foreach
// and it is completed when the stream completes
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] representing the stream completion from above
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow)
val connected = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
我有两个问题:
>
如何将消费者和websocket客户端组合成一个流,并让它将消息发送到Web服务器。
我想根据内容将从Web服务器接收到的消息广播到两个接收器中。
如何构建这样的图?
如果您计划将所有Kafka消息推入web套接字,而不进行响应处理,则应该在输入和输出没有逻辑连接的真正双向场景中编写web套接字的消息处理程序:
//Kafka reading logic
val kafkaSource: Source[ConsumerRecord[String, String], Consumer.Control] = Consumer
.plainSource(consumerSettings, Subscriptions.topics("SAPEVENTBUS"))
//kafka message serialization logic
val kafkaRecordToMessageTransform: Flow[ConsumerRecord[String, String], Message, NotUsed] =
Flow[ConsumerRecord[String, String]].map[Message](consumerRecord => {
TextMessage.Strict(s"${consumerRecord.key} - ${consumerRecord.value}")
})
//web socket's messages sending logic
val webSocketWriteLogic: Source[Message, Consumer.Control] =
kafkaSource.via(kafkaRecordToMessageTransform)
//web socket's messages receiving logic
val webSocketReadLogic: Sink[Message, NotUsed] = Flow[Message].mapAsync[String](1)({
case textMessage: TextMessage =>
textMessage.toStrict(collectTimeout).map(_.text)
case binaryMessage: BinaryMessage =>
binaryMessage.toStrict(collectTimeout).map(_.data.toString())
}).to(Sink.foreach[String](messageText => println(s"received $messageText")))
//web socket's logic
val webSocketLogic: Flow[Message, Message, Consumer.Control] =
Flow.fromSinkAndSourceMat(webSocketReadLogic, webSocketWriteLogic)(Keep.right)
根据分区阶段的某些条件,您可以将流消息广播到多个接收器中。还有,你可以查看这个解释。
我想在React项目的后端合并一个使用语音识别的Python文件。 这里我有一段使用语音识别的Python代码: 我尝试在react组件中导入文件,如下所示: 在这里,我尝试将我的按钮链接到Python文件中的函数: 我收到了它未能编译的消息以及以下内容: 我能做些什么来使这个工作?
为了能够轻松地测试我的SOAP客户机(通过利用MockWebServiceServer),我想使用Spring的WebServiceTemplate。 SOAPendpoint是.NET服务器。基于wsdl,我能够生成bean(主要是请求类)和endpoint接口。 生成的界面如下: SOAPFaultClientException:反序列化操作“CallCheckXML”的请求消息正文时出错。O
问题内容: 我已经使用Python asyncio和aiohttp成功构建了一个RESTful微服务,该服务可侦听POST事件以收集来自各种供料器的实时事件。 然后,它构建一个内存结构,以将事件的最后24小时缓存在嵌套的defaultdict / deque结构中。 现在,我想定期检查该结构到磁盘的位置,最好使用pickle。 由于内存结构可以大于100MB,因此我希望避免在检查点结构所需的时间上
问题内容: 如何创建使芹菜任务看起来像的包装器?还是有更好的方法与Celery集成? Celery的创建者@asksol这样说: 将Celery用作异步I / O框架之上的分布式层是很常见的(提示:将CPU绑定的任务路由到prefork worker意味着它们不会阻塞事件循环)。 但是我找不到任何专门针对框架的代码示例。 问题答案: 如官方网站上所述,这可以通过Celery 5.0版实现: htt
我有一个(Spring开机/Spring云)应用程序(微服务'MS'架构)与Netflix工具构建,我想将其部署在kubernetes集群(一个主和2个小跟班),以从其编排事实中获得优势。 顺便说一下,我在集群上创建了一个库贝-dns服务,我还尝试用3个Pod挂载一个eureka服务(名为eurekaservice)。另一方面,我运行了一个带有下一个eureka配置的微服务: 好消息是集群上的每个