当前位置: 首页 > 知识库问答 >
问题:

将回调方法实现转换为akka流源

湛宜春
2023-03-14

我正在使用来自我无法控制的java库的数据发布者。发布者库使用典型的回调设置;在库代码的某个地方(库是java,但我将在scala中描述简洁):

type DataType = ???

trait DataConsumer {
  def onData(data : DataType) : Unit
}

库的用户需要编写一个实现onData方法的类,并将其传递给DataProducer,库代码如下所示:

class DataProducer(consumer : DataConsumer) {...}

DataProducer有自己的内部线程我无法控制,以及随附的数据缓冲区,即每当有另一个DataType对象要使用时调用onData

所以,我的问题是:如何编写一个层,将原始库模式转换/转换为akka流源对象?

提前谢谢你。

共有2个答案

孔鹤龄
2023-03-14

有多种方法可以解决这个问题。一种是使用ActorPublisher:http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors您可以更改回调,以便它向参与者发送消息。根据回调的工作方式,您也可以使用mapAsync(将回调转换为未来)。只有当一个请求恰好产生一个回调调用时,这才有效。

艾嘉石
2023-03-14

回拨--

下面是创建DataConsumer回调函数的代码,该函数将消息发送到akka流Source

注意:创建功能性ActorPublish比我在下面说明的要多得多。特别是,需要进行缓冲以处理DataProducer调用onData的速度快于Sink发出需求信号的情况(请参阅此示例)。下面的代码只是设置“接线”。

import akka.actor.ActorRef
import akka.actor.Actor.noSender

import akka.stream.Actor.ActorPublisher

/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
  def receive : Receive = {
    case message : DataType => deliverBuf() //defined in example link
  }    
}

class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
  override def onData(data : DataType) = sourceActor.tell(data, noSender)
}

//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]

//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)

//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))

//setup the incoming data feed from 3rd party library
val dataProducer  = DataProducer(actorConsumer)

回拨--

最初的问题专门要求对源代码进行回调,但如果整个流(而不仅仅是源代码)都可用,那么处理回调就更容易了。这是因为可以使用Source#ActorRef函数将流具体化为ActorRef。例如:

val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val bufferSize = 100

val streamRef = 
  Source
    .actorRef[DataType](bufferSize, overflowStrategy)
    .via(someFlow)
    .to(someSink)
    .run()

val streamConsumer = new DataConsumer {
  override def onData(data : DataType) : Unit = streamRef ! data
} 

val dataProducer = DataProducer(streamConsumer)
 类似资料:
  • 本文向大家介绍Python实现将Excel转换成为image的方法,包括了Python实现将Excel转换成为image的方法的使用技巧和注意事项,需要的朋友参考一下 我的主要思路是: Excel -> Html -> Image 代码如下: 以上这篇Python实现将Excel转换成为image的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持呐喊教程。

  • 问题内容: 我正在使用播放框架和Apache Kafka。 我有一个POST方法,它将消息发送到Kafka。Kafka有一个API方法 public java.util.concurrent.Future send(ProducerRecord记录,回调回调) 其中Javadoc说 异步将记录发送到主题,并在确认发送后调用提供的回调。 我正在使用play框架公开此功能。我想从Controller方

  • 我正在使用play框架和Apache Kafka。 我有一个POST方法,它向Kafka发送消息。Kafka有一个API方法 公共java.util.concurrent。未来发送(生产记录记录、回调) 其中Javadoc说 异步发送记录到主题,并在发送被确认时调用提供的回调。 我将使用play框架公开此功能。我想返回一个<代码>promise

  • 问题内容: 我想使用Promise,但是我有一个类似以下格式的回调API: 1. DOM加载或其他一次事件: 2.普通回调: 3.节点样式回调(“ nodeback”): 4.带有节点样式回调的整个库: 如何在promise中使用API​​,如何“承诺”它? 问题答案: 承诺有状态,它们从待定状态开始,可以解决: 完成 意味着计算成功完成。 拒绝 表示计算失败。 承诺返回函数 绝不应该抛出,而应该

  • 本文向大家介绍Python实现将DOC文档转换为PDF的方法,包括了Python实现将DOC文档转换为PDF的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python实现将DOC文档转换为PDF的方法。分享给大家供大家参考。具体实现方法如下: 希望本文所述对大家的Python程序设计有所帮助。

  • 问题内容: 我想使用Promise,但是我有一个类似以下格式的回调API: 1. DOM加载或其他一次事件: 2.普通回调: 3.节点样式回调(“ nodeback”): 4.带有节点样式回调的整个库: 如何在Promise中使用API​​,如何“承诺”呢? 问题答案: 承诺具有状态,它们从待定状态开始,可以解决: fulfilled成 意味着计算成功完成。 fulfilled 表示计算失败。 承