在以前的博文中我们介绍了Slick,它是一种FRM(Functional Relation Mapper)。有别于ORM,FRM的特点是函数式的语法可以支持灵活的对象组合(Query Composition)实现大规模的代码重复利用,但同时这些特点又影响了编程人员群体对FRM的接受程度,阻碍了FRM成为广为流行的一种数据库编程方式。所以我们只能从小众心态来探讨如何改善Slick现状,希望通过与某些Stream库集成,在Slick FRM的基础上恢复一些人们熟悉的Recordset数据库光标(cursor)操作方式,希望如此可以降低FRM数据库编程对函数式编程水平要求,能够吸引更多的编程人员接受FRM。刚好,在这篇讨论里我们希望能介绍一些Akka-Stream和外部系统集成对接的实际用例,把Slick数据库数据载入连接到Akka-Stream形成streaming-dataset应该是一个挺好的想法。Slick和Akka-Stream可以说是自然匹配的一对,它们都是同一个公司产品,都支持Reactive-Specification。Reactive系统的集成对象之间是通过公共界面Publisher来实现对接的。Slick提供了个Dababase.stream函数可以构建这个Publisher:
/** Create a `Publisher` for Reactive Streams which, when subscribed to, will run the specified * `DBIOAction` and return the result directly as a stream without buffering everything first. * This method is only supported for streaming actions. * * The Publisher itself is just a stub that holds a reference to the action and this Database. * The action does not actually start to run until the call to `onSubscribe` returns, after * which the Subscriber is responsible for reading the full response or cancelling the * Subscription. The created Publisher can be reused to serve a multiple Subscribers, * each time triggering a new execution of the action. * * For the purpose of combinators such as `cleanup` which can run after a stream has been * produced, cancellation of a stream by the Subscriber is not considered an error. For * example, there is no way for the Subscriber to cause a rollback when streaming the * results of `someQuery.result.transactionally`. * * When using a JDBC back-end, all `onNext` calls are done synchronously and the ResultSet row * is not advanced before `onNext` returns. This allows the Subscriber to access LOB pointers * from within `onNext`. If streaming is interrupted due to back-pressure signaling, the next * row will be prefetched (in order to buffer the next result page from the server when a page * boundary has been reached). */ final def stream[T](a: DBIOAction[_, Streaming[T], Nothing]): DatabasePublisher[T] = streamInternal(a, false)
这个DatabasePublisher[T]就是一个Publisher[T]:
/** A Reactive Streams `Publisher` for database Actions. */ abstract class DatabasePublisher[T] extends Publisher[T] { self => ... }
然后Akka-Stream可以通过Source.fromPublisher(publisher)构建Akka Source构件:
/** * Helper to create [[Source]] from `Publisher`. * * Construct a transformation starting with given publisher. The transformation steps * are executed by a series of [[org.reactivestreams.Processor]] instances * that mediate the flow of elements downstream and the propagation of * back-pressure upstream. */ def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] = fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))
理论上Source.fromPublisher(db.stream(query))就可以构建一个Reactive-Stream-Source了。下面我们就建了例子来做示范:首先是Slick的铺垫代码boiler-code:
val aqmraw = Models.AQMRawQuery val db = Database.forConfig("h2db") // aqmQuery.result returns Seq[(String,String,String,String)] val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)} // type alias type RowType = (String,String,String,String) // user designed strong typed resultset type. must extend FDAROW case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW // strong typed resultset conversion function. declared implicit to remind during compilation implicit def toTypedRow(row: RowType): TypedRow = TypedRow(row._1,row._2,row._3,row._4)
我们需要的其实就是aqmQuery,用它来构建DatabasePublisher:
// construct DatabasePublisher from db.stream val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result) // construct akka source val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)
有了dbPublisher就可以用Source.fromPublisher函数构建source了。现在我们试着运算这个Akka-Stream:
implicit val actorSys = ActorSystem("actor-system") implicit val ec = actorSys.dispatcher implicit val mat = ActorMaterializer() source.take(6).map{row => toTypedRow(row)}.runWith( Sink.foreach(qmr => { println(s"州名: ${qmr.state}") println(s"县名:${qmr.county}") println(s"年份:${qmr.year}") println(s"取值:${qmr.value}") println("-------------") })) scala.io.StdIn.readLine() actorSys.terminate()
下面是运算结果:
州名: Alabama 县名:Elmore 年份:1999 取值:5 ------------- 州名: Alabama 县名:Jefferson 年份:1999 取值:39 ------------- 州名: Alabama 县名:Lawrence 年份:1999 取值:28 ------------- 州名: Alabama 县名:Madison 年份:1999 取值:31 ------------- 州名: Alabama 县名:Mobile 年份:1999 取值:32 ------------- 州名: Alabama 县名:Montgomery 年份:1999 取值:15 -------------
显示我们已经成功的连接了Slick和Akka-Stream。
现在我们有了Reactive stream source,它是个akka-stream,该如何对接处于下游的scalaz-stream-fs2呢?我们知道:akka-stream是Reactive stream,而scalaz-stream-fs2是纯“拖式”pull-model stream,也就是说上面这个Reactive stream source必须被动等待下游的scalaz-stream-fs2来读取数据。按照Reactive-Stream规范,下游必须通过backpressure信号来知会上游是否可以发送数据状态,也就是说我们需要scalaz-stream-fs2来产生backpressure。scalaz-stream-fs2 async包里有个Queue结构:
/** * Asynchronous queue interface. Operations are all nonblocking in their * implementations, but may be 'semantically' blocking. For instance, * a queue may have a bound on its size, in which case enqueuing may * block until there is an offsetting dequeue. */ trait Queue[F[_], A] { self => /** * Enqueues one element in this `Queue`. * If the queue is `full` this waits until queue is empty. * * This completes after `a` has been successfully enqueued to this `Queue` */ def enqueue1(a: A): F[Unit] /** * Enqueues each element of the input stream to this `Queue` by * calling `enqueue1` on each element. */ def enqueue: Sink[F, A] = _.evalMap(enqueue1) /** Dequeues one `A` from this queue. Completes once one is ready. */ def dequeue1: F[A] /** Repeatedly calls `dequeue1` forever. */ def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat ... }
这个结构支持多线程操作,也就是说enqueue和dequeue可以在不同的线程里操作。值得关注的是:enqueue会block,只有在完成了dequeue后才能继续。这个dequeue就变成了抵消backpressure的有效方法了。具体操作方法是:上游在一个线程里用enqueue发送一个数据元素,然后等待下游完成在另一个线程里的dequeue操作,完成这个循环后再进行下一个元素的enqueue。enqueue代表akka-stream向scalaz-stream-fs2发送数据,可以用akka-stream的Sink构件来实现:
class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] { val in = Inlet[T]("inport") val shape = SinkShape.of(in) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler { override def preStart(): Unit = { pull(in) //initiate stream elements movement super.preStart() } override def onPush(): Unit = { q.enqueue1(Some(grab(in))).unsafeRun() pull(in) } override def onUpstreamFinish(): Unit = { q.enqueue1(None).unsafeRun() println("the end of stream !") completeStage() } override def onUpstreamFailure(ex: Throwable): Unit = { q.enqueue1(None).unsafeRun() completeStage() } setHandler(in,this) } }
以上这个akka-stream GraphStage描述了对上游每一个元素的enqueue动作。我们可以用scalaz-stream-fs2的flatMap来序列化运算两个线程里的enqueue和dequeue:
val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16)) .flatMap { q => Task(source.to(new FS2Gate[RowType](q)).run).unsafeRunAsyncFuture //enqueue Task(new thread) pipe.unNoneTerminate(q.dequeue) //dequeue in current thread }
这个函数返回fs2.Stream[Task,RowType],是一种运算方案,我们必须run来实际运算:
fs2Stream.map{row => toTypedRow(row)} .map(qmr => { println(s"州名: ${qmr.state}") println(s"县名:${qmr.county}") println(s"年份:${qmr.year}") println(s"取值:${qmr.value}") println("-------------") }).run.unsafeRun
通过测试运行,我们成功的为scalaz-stream-fs2实现了data streaming。
下面是本次示范的源代码:
import slick.jdbc.H2Profile.api._ import com.bayakala.funda._ import api._ import scala.language.implicitConversions import scala.concurrent.duration._ import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage._ import slick.basic.DatabasePublisher import akka._ import fs2._ import akka.stream.stage.{GraphStage, GraphStageLogic} class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] { val in = Inlet[T]("inport") val shape = SinkShape.of(in) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler { override def preStart(): Unit = { pull(in) //initiate stream elements movement super.preStart() } override def onPush(): Unit = { q.enqueue1(Some(grab(in))).unsafeRun() pull(in) } override def onUpstreamFinish(): Unit = { q.enqueue1(None).unsafeRun() println("end of stream !!!!!!!") completeStage() } override def onUpstreamFailure(ex: Throwable): Unit = { q.enqueue1(None).unsafeRun() completeStage() } setHandler(in,this) } } object AkkaStreamSource extends App { val aqmraw = Models.AQMRawQuery val db = Database.forConfig("h2db") // aqmQuery.result returns Seq[(String,String,String,String)] val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)} // type alias type RowType = (String,String,String,String) // user designed strong typed resultset type. must extend FDAROW case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW // strong typed resultset conversion function. declared implicit to remind during compilation implicit def toTypedRow(row: RowType): TypedRow = TypedRow(row._1,row._2,row._3,row._4) // construct DatabasePublisher from db.stream val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result) // construct akka source val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher) implicit val actorSys = ActorSystem("actor-system") implicit val ec = actorSys.dispatcher implicit val mat = ActorMaterializer() /* source.take(10).map{row => toTypedRow(row)}.runWith( Sink.foreach(qmr => { println(s"州名: ${qmr.state}") println(s"县名:${qmr.county}") println(s"年份:${qmr.year}") println(s"取值:${qmr.value}") println("-------------") })) */ val fs2Stream: Stream[Task,RowType] = Stream.eval(async.boundedQueue[Task,Option[RowType]](16)) .flatMap { q => Task(source.to(new FS2Gate[RowType](q)).run).unsafeRunAsyncFuture //enqueue Task(new thread) pipe.unNoneTerminate(q.dequeue) //dequeue in current thread } fs2Stream.map{row => toTypedRow(row)} .map(qmr => { println(s"州名: ${qmr.state}") println(s"县名:${qmr.county}") println(s"年份:${qmr.year}") println(s"取值:${qmr.value}") println("-------------") }).run.unsafeRun scala.io.StdIn.readLine() actorSys.terminate() }