fs2在处理异常及资源使用安全方面也有比较大的改善。fs2 Stream可以有几种方式自行引发异常:直接以函数式方式用fail来引发异常、在纯代码里隐式引发异常或者在运算中引发异常,举例如下:
/函数式
val err = Stream(1,2,3) ++ Stream.fail(new Exception("oh,no..."))
//> err : fs2.Stream[Nothing,Int] = append(Segment(Emit(Chunk(1, 2, 3))), Segment(Emit(Chunk(()))).flatMap(<function1>))
//隐式转换
val err1 = Stream(1,2,3) ++ (throw new Exception("oh my god!"))
//> err1 : fs2.Stream[Nothing,Int] = append(Segment(Emit(Chunk(1, 2, 3))), Segment(Emit(Chunk(()))).flatMap(<function1>))
//运算中
val err2 = Stream.eval(Task.delay { throw new Exception("it suck!")})
//> err2 : fs2.Stream[fs2.Task,Nothing] = attemptEval(Task).flatMap(<function1>)
try err.toList catch {case e => println(e.getMessage)}
//> oh,no...
//| res0: Any = ()
try err2.run.unsafeRun catch {case e => println(e.getMessage)}
//> it suck!
err.map(_.toString).onError { case e => Stream.emit(e.getMessage) }.toList
//> res1: List[String] = List(1, 2, 3, oh,no...)
err1.map(_.toString).onError { case e => Stream.emit(e.getMessage) }.toList
//> res2: List[String] = List(1, 2, 3, oh my god!)
err2.onError { case e => Stream.emit(e.getMessage) }.runLog.unsafeRun
//> res3: Vector[String] = Vector(it suck!)
我们在上一篇讨论中介绍过fs2提供了一个bracket函数来保证资源的安全使用。bracket函数款式是这样的:
def bracket[F[_],R,A](r: F[R])(use: R => Stream[F,A], release: R => F[Unit]) = Stream.mk {
StreamCore.acquire(r, release andThen (Free.eval)) flatMap (use andThen (_.get))
}
val counter = new java.util.concurrent.atomic.AtomicLong(0)
//> counter : java.util.concurrent.atomic.AtomicLong = 0
val acquire = Task.delay { println(s"acquiring:${counter.incrementAndGet}") }
//> acquire : fs2.Task[Unit] = Task
val release = Task.delay { println(s"releasing:${counter.decrementAndGet}") }
//> release : fs2.Task[Unit] = Task
Stream.bracket(acquire)(_ => Stream(4,5,6) ++ err, _ => release).run.unsafeRun
//> acquiring:1
//| releasing:0
//| java.lang.Exception: oh,no...
//| at fs2Safety$$anonfun$main$1$$anonfun$3.apply(fs2Safety.scala:4)
//| at fs2Safety$$anonfun$main$1$$anonfun$3.apply(fs2Safety.scala:4)
s1.map(_.toString).onError {case e => Stream.emit(e.getMessage)}.runLog.unsafeRun
//> acquiring:1
//| releasing:0
//| res4: Vector[String] = Vector(4, 5, 6, 1, 2, 3, oh,no...)
我们也可以用attempt来获取所有运算结果:
s1.attempt.runLog.unsafeRun //> acquiring:1
//| releasing:0
//| res5: Vector[fs2.util.Attempt[Int]] = Vector(Right(4), Right(5), Right(6), Right(1), Right(2), Right(3), Left(java.lang.Exception: oh,no...))
我们再举个在bracket在中间环节里占用资源的例子:
def logBracket[A]: A => Stream[Task,A] = a => {
Stream.bracket(Task.delay { println(s"acquiring $a"); a })(
_ => Stream.emit(a),
_ => Task.delay { println(s"releasing $a") })
} //> logBracket: [A]=> A => fs2.Stream[fs2.Task,A]
Stream(3).flatMap(logBracket).map{ n =>
if (n>2) sys.error("oh no...") else n }.run.unsafeAttemptRun
//> acquiring 3
//| releasing 3
//| res6: fs2.util.Attempt[Unit] = Left(java.lang.RuntimeException: oh no...)
实际上所谓安全的资源使用(resource safety)主要是指在任何形式的运算终止情况下运算的事后处理程序都能保证得到运行。运算的终止形式有以下几种:
1、正常终止。如Stream(1,2,3)的运算:在发出一节Chunk(1,2,3)后终止
2、异常终止。在运算过程中发生异常中途终止
3、强迫终止。用户强制终止,如:Stream.range(1,5).take(1),在发出元素1后就立刻终止
我们要注意的是第三种情况。先看个例子:
val s5 = (Stream(1) ++ Stream.fail(new Exception("oh no...")))
//> s5 : fs2.Stream[Nothing,Int] = append(Segment(Emit(Chunk(1))), Segment(Emit(Chunk(()))).flatMap(<function1>))
s5.map(_.toString).onError {case e => Stream.emit(e.getMessage)}.toList
//> res7: List[String] = List(1, oh no...)
s5.take(1).toList //> res8: List[Int] = List(1)
(Stream("a") ++ Stream("bc")).onComplete(Stream.emit("completed!")).toList
//> res9: List[String] = List(a, bc, completed!)
s5.map(_.toString).onComplete(Stream.emit("completed!")).take(1).toList
//> res10: List[String] = List(1)
s5.covary[Task].map(_.toString).onFinalize(Task.delay { println("finalized!")})
.take(1).runLog.unsafeRun //> finalized!
//| res11: Vector[String] = Vector(1)
我们看到:虽然s5会引发异常,可以用onError来捕获异常。但奇怪的是用take(1)后不会发生异常。这是因为take(1)是用户强制终止操作,即在发出一个元素后即刻终止。此时还没开始处理fail。值得注意的是运算遭到强制终止后onComplete是不会运行的,onFinalize则在任何情况下都能得到运行。
说到运算安全,FP的运行方式以递归算法为主:flatMap就是一个递归算法,那么在fs2里能不能保证运算的安全呢?下面的测试程序可以成为最具代表性的示范:
// Sanity tests - not run as part of unit tests, but these should run forever
// at constant memory.
//
object ResourceTrackerSanityTest extends App {
val big = Stream.constant(1).flatMap { n =>
Stream.bracket(Task.delay(()))(_ => Stream.emits(List(1, 2, 3)), _ => Task.delay(()))
}
big.run.unsafeRun
}
object RepeatPullSanityTest extends App {
def id[A]: Pipe[Pure, A, A] = _ repeatPull Pull.receive1 { case h #: t => Pull.output1(h) as t }
Stream.constant(1).covary[Task].throughPure(id).run.unsafeRun
}
从上面的讨论里我们知道了bracket函数是fs2建议的安全运算机制。我们可以用bracket来读取我们自定义的资源,如:数据库或者一些外设,这样我们可以确定当运算终止后事后处理机制一定会发生作用。fs2在io.file对象里提供了自身的文件读写功能,这些函数都具备了资源使用安全机制。也就是说当对fs2.file的使用终止后,事后处理机制运行是得到保证的。下面我们分享一个fs2.file的经典例子:
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0) //> fahrenheitToCelsius: (f: Double)Double
val converter: Task[Unit] =
io.file.readAll[Task](java.nio.file.Paths.get("/users/tiger-macpro/fahrenheit.txt"), 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(java.nio.file.Paths.get("/users/tiger-macpro/celsius.txt")))
.run //> converter : fs2.Task[Unit] = Task
converter.unsafeRun
def readAll[F[_]](path: Path, chunkSize: Int)(implicit F: Effect[F]): Stream[F, Byte] = {...}
/** Converts UTF-8 encoded byte stream to a stream of `String`. */
def utf8Decode[F[_]]: Pipe[F, Byte, String] =
_.chunks.through(utf8DecodeC)
def writeAll[F[_]](path: Path, flags: Seq[StandardOpenOption] = List(StandardOpenOption.CREATE))(implicit F: Effect[F]): Sink[F, Byte] = {...}
writeAll的结果类型是Sink[F,Byte],代表输入是Stream[F,Byte],所以我们必须用utf8Encode先把String转成Byte。text.lines是fs2自带的文字型iterator:在fs2里不再使用java的iterator了。另外interperse的作用是在元素由String转换成Byte之前先进行分行。在这篇讨论里我们主要介绍的是pipe对象中的函数。我们将会在下次关于多线程运算的讨论里介绍pipe2。