{
...,
"origin":"blockchain.info"
}
import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val incoming: Sink[Message, Future[Done]] =
Flow[Message].mapAsync(4) {
case message: TextMessage.Strict =>
println(message.text)
Future.successful(Done)
case message: TextMessage.Streamed =>
message.textStream.runForeach(println)
case message: BinaryMessage =>
message.dataStream.runWith(Sink.ignore)
}.toMat(Sink.last)(Keep.right)
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv"))
val ((completionPromise, upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
// TODO not working integrating kafka here
// .map(_.toString)
// .map { elem =>
// println(s"PlainSinkProducer produce: ${elem}")
// new ProducerRecord[Array[Byte], String]("topic1", elem)
// }
// .runWith(Producer.plainSink(producerSettings))
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
system.terminate
}
}
// kafka that works / writes dummy data
val done1 = Source(1 to 100)
.map(_.toString)
.map { elem =>
println(s"PlainSinkProducer produce: ${elem}")
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.runWith(Producer.plainSink(producerSettings))
一个问题是传入
阶段,它被建模为接收器
。在这里它应该被建模为流
。然后把信息输入Kafka。
因为传入的文本消息可以streamed
。您可以按照以下方式使用FlatMapMerge
组合子,以避免在内存中存储整个(可能很大的)消息:
val incoming: Flow[Message, String, NotUsed] = Flow[Message].mapAsync(4) {
case msg: BinaryMessage =>
msg.dataStream.runWith(Sink.ignore)
Future.successful(None)
case TextMessage.Streamed(src) =>
src.runFold("")(_ + _).map { msg => Some(msg) }
}.collect {
case Some(msg) => msg
}
在这一点上,您得到了一个产生字符串的东西,并且可以连接到Kafka:
val addOrigin: Flow[String, String, NotUsed] = ???
val ((completionPromise, upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.via(incoming)
.via(addOrigin)
.map { elem =>
println(s"PlainSinkProducer produce: ${elem}")
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.toMat(Producer.plainSink(producerSettings))(Keep.both)
.run()
我正试图实现类似的图表。例如,三月份通过卡和现金(40美元现金和38美元卡支付)收取的出租车运输付款。我需要用主色和主色的浅色版本显示那个栏。我有两个问题什么样的图表能满足我的需求?如何制作具有相同颜色(深蓝色和浅蓝色)两种不同色调的酒吧? 我已经尝试了以下代码,我确信数据集没有像我前面解释的那样包括卡和现金选项。 谢谢你。
我想写一个GraphStage,可以通过发送另一个演员的消息来暂停/取消暂停。 下面截取的代码显示了一个简单的,它会生成随机数。当舞台具体化时,会向主管发送一条包含的消息(在中)。主管保留舞台的,因此可用于控制舞台。 这是supervisor actor的一个非常简单的实现: 我正在使用以下应用程序运行流: 当应用程序以“暂停”状态启动阶段ia并且不产生任何数字时。当我按下键时,我的舞台开始发出数
我正在创建一个自定义的ImageView,它将我的图像裁剪成一个六边形形状,并添加一个边框。我想知道我的方法是正确的还是我的做法是错误的。有一堆自定义库已经这样做了,但没有一个开箱即用的形状,我正在寻找。话虽如此,这更多的是一个关于最佳实践的问题。 您可以在这个要点中看到完整的类,但主要问题是这是否是最好的方法。我觉得不对,部分原因是一些神奇的数字,这意味着它可能会在某些设备上搞砸。 下面是代码的
我已经浏览了android中的MPAndroidChart库:https://github.com/PhilJay/MPAndroidChart 它非常强大的库,用于在android中实现Graph和Chart。 我必须显示条形图,如下所示: 我所取得的成就是: 问题是,我是否可以将这些条自定义为如上图所示。i、 e.圆角形状?
这是柱状图的图片。正如您所看到的,存在对齐问题,条形图没有与标签对齐,尤其是在中间部分。此外,我希望底部轴显示10的条形图,而不是1.2、1.4、1.6等,因为不会有任何小数,所以它没有用处。我还希望每个条的值在末尾显示为一个数字,以显示每个条的总计数。 图表https://imgur.com/gallery/ThHx1eJ图片 样式设置
上一篇 讲了如何自定义一个跟随节点移动并缩放的 Gizmo,这篇我们将实现一个可以编辑的 Gizmo 1、在 资源管理器的 CustomComponent 脚本中定义 offset: properties: { //... offset: cc.Vec2 }, 2、将 custom-gizmo.js 改为以下内容并保存: let ToolType = { None: 0,