我正在使用来自我无法控制的java库的数据发布者。发布者库使用典型的回调设置;在库代码的某个地方(库是java,但我将在scala中描述简洁):
type DataType = ???
trait DataConsumer {
def onData(data : DataType) : Unit
}
库的用户需要编写一个实现onData
方法的类,并将其传递给DataProducer
,库代码如下所示:
class DataProducer(consumer : DataConsumer) {...}
DataProducer
有自己的内部线程我无法控制,以及随附的数据缓冲区,即每当有另一个DataType
对象要使用时调用onData
。
所以,我的问题是:如何编写一个层,将原始库模式转换/转换为akka流源对象?
提前谢谢你。
有多种方法可以解决这个问题。一种是使用ActorPublisher:http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors您可以更改回调,以便它向参与者发送消息。根据回调的工作方式,您也可以使用mapAsync(将回调转换为未来)。只有当一个请求恰好产生一个回调调用时,这才有效。
回拨--
下面是创建DataConsumer
回调函数的代码,该函数将消息发送到akka流Source
。
注意:创建功能性ActorPublish比我在下面说明的要多得多。特别是,需要进行缓冲以处理DataProducer
调用onData
的速度快于Sink
发出需求信号的情况(请参阅此示例)。下面的代码只是设置“接线”。
import akka.actor.ActorRef
import akka.actor.Actor.noSender
import akka.stream.Actor.ActorPublisher
/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
def receive : Receive = {
case message : DataType => deliverBuf() //defined in example link
}
}
class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
override def onData(data : DataType) = sourceActor.tell(data, noSender)
}
//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]
//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)
//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))
//setup the incoming data feed from 3rd party library
val dataProducer = DataProducer(actorConsumer)
回拨--
最初的问题专门要求对源代码进行回调,但如果整个流(而不仅仅是源代码)都可用,那么处理回调就更容易了。这是因为可以使用Source#ActorRef函数将流具体化为ActorRef
。例如:
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val bufferSize = 100
val streamRef =
Source
.actorRef[DataType](bufferSize, overflowStrategy)
.via(someFlow)
.to(someSink)
.run()
val streamConsumer = new DataConsumer {
override def onData(data : DataType) : Unit = streamRef ! data
}
val dataProducer = DataProducer(streamConsumer)
本文向大家介绍Python实现将Excel转换成为image的方法,包括了Python实现将Excel转换成为image的方法的使用技巧和注意事项,需要的朋友参考一下 我的主要思路是: Excel -> Html -> Image 代码如下: 以上这篇Python实现将Excel转换成为image的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持呐喊教程。
问题内容: 我正在使用播放框架和Apache Kafka。 我有一个POST方法,它将消息发送到Kafka。Kafka有一个API方法 public java.util.concurrent.Future send(ProducerRecord记录,回调回调) 其中Javadoc说 异步将记录发送到主题,并在确认发送后调用提供的回调。 我正在使用play框架公开此功能。我想从Controller方
我正在使用play框架和Apache Kafka。 我有一个POST方法,它向Kafka发送消息。Kafka有一个API方法 公共java.util.concurrent。未来发送(生产记录记录、回调) 其中Javadoc说 异步发送记录到主题,并在发送被确认时调用提供的回调。 我将使用play框架公开此功能。我想返回一个<代码>promise
问题内容: 我想使用Promise,但是我有一个类似以下格式的回调API: 1. DOM加载或其他一次事件: 2.普通回调: 3.节点样式回调(“ nodeback”): 4.带有节点样式回调的整个库: 如何在promise中使用API,如何“承诺”它? 问题答案: 承诺有状态,它们从待定状态开始,可以解决: 完成 意味着计算成功完成。 拒绝 表示计算失败。 承诺返回函数 绝不应该抛出,而应该
本文向大家介绍Python实现将DOC文档转换为PDF的方法,包括了Python实现将DOC文档转换为PDF的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python实现将DOC文档转换为PDF的方法。分享给大家供大家参考。具体实现方法如下: 希望本文所述对大家的Python程序设计有所帮助。
问题内容: 我想使用Promise,但是我有一个类似以下格式的回调API: 1. DOM加载或其他一次事件: 2.普通回调: 3.节点样式回调(“ nodeback”): 4.带有节点样式回调的整个库: 如何在Promise中使用API,如何“承诺”呢? 问题答案: 承诺具有状态,它们从待定状态开始,可以解决: fulfilled成 意味着计算成功完成。 fulfilled 表示计算失败。 承