当前位置: 首页 > 知识库问答 >
问题:

akka流自定义图形阶段

喻元龙
2023-03-14
{
...,
"origin":"blockchain.info"
}
    null
import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val incoming: Sink[Message, Future[Done]] =
    Flow[Message].mapAsync(4) {
      case message: TextMessage.Strict =>
        println(message.text)
        Future.successful(Done)
      case message: TextMessage.Streamed =>
        message.textStream.runForeach(println)
      case message: BinaryMessage =>
        message.dataStream.runWith(Sink.ignore)
    }.toMat(Sink.last)(Keep.right)

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right)

val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv"))

val ((completionPromise, upgradeResponse), closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.both)
      .toMat(incoming)(Keep.both)
      // TODO not working integrating kafka here
      // .map(_.toString)
      //    .map { elem =>
      //      println(s"PlainSinkProducer produce: ${elem}")
      //      new ProducerRecord[Array[Byte], String]("topic1", elem)
      //    }
      //    .runWith(Producer.plainSink(producerSettings))
      .run()

val connected = upgradeResponse.flatMap { upgrade =>
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Future.successful(Done)
    } else {
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      system.terminate
    }
  }

// kafka that works / writes dummy data
val done1 = Source(1 to 100)
    .map(_.toString)
    .map { elem =>
      println(s"PlainSinkProducer produce: ${elem}")
      new ProducerRecord[Array[Byte], String]("topic1", elem)
    }
    .runWith(Producer.plainSink(producerSettings))

共有1个答案

南宫兴德
2023-03-14

一个问题是传入阶段,它被建模为接收器。在这里它应该被建模为。然后把信息输入Kafka。

因为传入的文本消息可以streamed。您可以按照以下方式使用FlatMapMerge组合子,以避免在内存中存储整个(可能很大的)消息:

  val incoming: Flow[Message, String, NotUsed] = Flow[Message].mapAsync(4) {
    case msg: BinaryMessage =>
      msg.dataStream.runWith(Sink.ignore)
      Future.successful(None)
    case TextMessage.Streamed(src) =>
      src.runFold("")(_ + _).map { msg => Some(msg) }
  }.collect {
    case Some(msg) => msg
  }

在这一点上,您得到了一个产生字符串的东西,并且可以连接到Kafka:

  val addOrigin: Flow[String, String, NotUsed] = ???

  val ((completionPromise, upgradeResponse), closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.both)
      .via(incoming)
      .via(addOrigin)
      .map { elem =>
        println(s"PlainSinkProducer produce: ${elem}")
        new ProducerRecord[Array[Byte], String]("topic1", elem)
      }
      .toMat(Producer.plainSink(producerSettings))(Keep.both)
      .run()
 类似资料:
  • 我正试图实现类似的图表。例如,三月份通过卡和现金(40美元现金和38美元卡支付)收取的出租车运输付款。我需要用主色和主色的浅色版本显示那个栏。我有两个问题什么样的图表能满足我的需求?如何制作具有相同颜色(深蓝色和浅蓝色)两种不同色调的酒吧? 我已经尝试了以下代码,我确信数据集没有像我前面解释的那样包括卡和现金选项。 谢谢你。

  • 我想写一个GraphStage,可以通过发送另一个演员的消息来暂停/取消暂停。 下面截取的代码显示了一个简单的,它会生成随机数。当舞台具体化时,会向主管发送一条包含的消息(在中)。主管保留舞台的,因此可用于控制舞台。 这是supervisor actor的一个非常简单的实现: 我正在使用以下应用程序运行流: 当应用程序以“暂停”状态启动阶段ia并且不产生任何数字时。当我按下键时,我的舞台开始发出数

  • 我正在创建一个自定义的ImageView,它将我的图像裁剪成一个六边形形状,并添加一个边框。我想知道我的方法是正确的还是我的做法是错误的。有一堆自定义库已经这样做了,但没有一个开箱即用的形状,我正在寻找。话虽如此,这更多的是一个关于最佳实践的问题。 您可以在这个要点中看到完整的类,但主要问题是这是否是最好的方法。我觉得不对,部分原因是一些神奇的数字,这意味着它可能会在某些设备上搞砸。 下面是代码的

  • 我已经浏览了android中的MPAndroidChart库:https://github.com/PhilJay/MPAndroidChart 它非常强大的库,用于在android中实现Graph和Chart。 我必须显示条形图,如下所示: 我所取得的成就是: 问题是,我是否可以将这些条自定义为如上图所示。i、 e.圆角形状?

  • 这是柱状图的图片。正如您所看到的,存在对齐问题,条形图没有与标签对齐,尤其是在中间部分。此外,我希望底部轴显示10的条形图,而不是1.2、1.4、1.6等,因为不会有任何小数,所以它没有用处。我还希望每个条的值在末尾显示为一个数字,以显示每个条的总计数。 图表https://imgur.com/gallery/ThHx1eJ图片 样式设置

  • 上一篇 讲了如何自定义一个跟随节点移动并缩放的 Gizmo,这篇我们将实现一个可以编辑的 Gizmo 1、在 资源管理器的 CustomComponent 脚本中定义 offset: properties: { //... offset: cc.Vec2 }, 2、将 custom-gizmo.js 改为以下内容并保存: let ToolType = { None: 0,