Scalaz(47)- scalaz-stream: 深入了解-Source

by admin on 2018年10月21日

 

 1   type BigStringResult = String
 2   val qJobResult = async.unboundedQueue[BigStringResult]
 3                          //> qJobResult  : scalaz.stream.async.mutable.Queue[demo.ws.blogStream.BigStringResult] = scalaz.stream.async.mutable.Queue$$anon$1@25d250c6
 4   def longGet(jobnum: Int): BigStringResult = {
 5     Thread.sleep(2000)
 6     s"Some large data sets from job#${jobnum}"
 7   }                      //> longGet: (jobnum: Int)demo.ws.blogStream.BigStringResult
 8       
 9 //  multi-tasking
10     val start = System.currentTimeMillis()        //> start  : Long = 1468556250797
11     Task.fork(qJobResult.enqueueOne(longGet(1))).unsafePerformAsync{case _ => ()}
12     Task.fork(qJobResult.enqueueOne(longGet(2))).unsafePerformAsync{case _ => ()}
13     Task.fork(qJobResult.enqueueOne(longGet(3))).unsafePerformAsync{case _ => ()}
14     val timemill = System.currentTimeMillis() - start
15                                                   //> timemill  : Long = 17
16     Thread.sleep(3000)
17     qJobResult.close.run
18  val bigData = {
19  //multi-tasking
20     val j1 = qJobResult.dequeue
21     val j2 = qJobResult.dequeue
22     val j3 = qJobResult.dequeue
23     for {
24      r1 <- j1
25      r2 <- j2
26      r3 <- j3
27     } yield r1 + ","+ r2 + "," + r3
28   }                       //> bigData  : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = Await(scalaz.concurrent.Task@778d1062,<function1>,<function1>)
29   
30   bigData.runLog.run      //> res9: Vector[String] = Vector(Some large data sets from job#2,Some large data sets from job#3,Some large data sets from job#1)

对一个Stream施用open后得到一个Pull类型。pll是只Pull数据结构,它的类型定义如下:

1   import scala.concurrent.duration._
2   implicit val scheduler = java.util.concurrent.Executors.newScheduledThreadPool(3)
3                   //> scheduler  : java.util.concurrent.ScheduledExecutorService = java.util.concurrent.ScheduledThreadPoolExecutor@516be40f[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
4   val fizz = time.awakeEvery(3.seconds).map(_ => "fizz")
5                   //> fizz  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@5762806e,<function1>,<function1>)
6   val fizz3 = fizz.take(3)   //> fizz3  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Halt(End),Vector(<function1>))
7   fizz3.runLog.run           //> res9: Vector[String] = Vector(fizz, fizz, fizz)

6、通过bracket函数增强了资源用安全,特别是异线程资源占用的以后处理过程。用onFinalize取代了onComplete

 

   
fs2是scalaz-stream的新型版本,沿用了scalaz-stream被动式(pull
model)数据流原理但采用了崭新的实现方式。fs2比较scalaz-stream而言具备了:更简单的根基零部件(combinator)、更安全之种、资源利用(type
safe, resource
safety)、更强的运算效率。由于fs2基本沿用了scalaz-stream的规律,所以我们见面当底下的讨论里根本介绍fs2的采取。根据fs2的合法文件,fs2具备了以下新的特点:

 

 1 (Stream(1,2,3) ++ Stream(4,5)).toList             //> res5: List[Int] = List(1, 2, 3, 4, 5)
 2 Stream(1,2,3).map { _ + 1}.toList                 //> res6: List[Int] = List(2, 3, 4)
 3 Stream(1,2,3).filter { _ % 2 == 0}.toList         //> res7: List[Int] = List(2)
 4 Stream(1,2,3).fold(0)(_ + _).toList               //> res8: List[Int] = List(6)
 5 Stream(None,Some(1),Some(3),None).collect {
 6   case None => 0
 7   case Some(i) => i
 8 }.toList                                          //> res9: List[Int] = List(0, 1, 3, 0)
 9 Stream.range(1,5).intersperse(42).toList          //> res10: List[Int] = List(1, 42, 2, 42, 3, 42, 4)
10 Stream(1,2,3).flatMap {x => Stream(x,x)}.toList   //> res11: List[Int] = List(1, 1, 2, 2, 3, 3)
11 Stream(1,2,3).repeat.take(5).toList               //> res12: List[Int] = List(1, 2, 3, 1, 2)
1 await(Task.delay{3})(emit)                        
2 //> res5: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@57855c9a,<function1>,<function1>)
3 eval(Task.delay{3})                               
4 //> res6: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@63e2203c,<function1>,<function1>)

5、增加了再次多并行运算组件(concurrent
primitives)

咱俩也堪把java的callback转变成Task:

type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O]

 

在上面的例子里我们下了through,to等连接函数。由于数量最终发送至极点stdOut,我们不要用runLog来记录运算结果。

 1 def emitSeq[A](xa: Seq[A]):Process[Task,A] =
 2   xa match {
 3     case h :: t => await(Task.delay {h})(emit) ++ emitSeq(t)
 4     case Nil => halt
 5   }                                     //> emitSeq: [A](xa: Seq[A])scalaz.stream.Process[scalaz.concurrent.Task,A]
 6 val es1 = emitSeq(Seq(1,2,3))           //> es1  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Await(scalaz.concurrent.Task@2d6eabae,<function1>,<function1>),Vector(<function1>))
 7 val es2 = emitSeq(Seq("a","b","c"))     //> es2  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Await(
 8 scalaz.concurrent.Task@4e7dc304,<function1>,<function1>),Vector(<function1>))
 9 es1.runLog.run                          //> res7: Vector[Int] = Vector(1, 2, 3)
10 es2.runLog.run                          //> res8: Vector[String] = Vector(a, b, c)

咱事先运算这个运算流,结果吗一个Task,然后又运算Task来赢得运算值:

case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O]

case class Await[+F[_], A, +O](
    req: F[A]
    , rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance
    , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing])
    ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] 

case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing]

case class Append[+F[_], +O](
    head: HaltEmitOrAwait[F, O]
    , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance
    ) extends Process[F, O] 

8、Stream取代了Process。fs2中重新没Process1、Tee、Wye、Channel这些项目别名,取而代之的凡:
  

/>> I =>
F[O]大凡独返回结果的函数,对输入I进行F运算后归来O,如把同漫漫记下写副数据库后回去写副状态

 1 def myTakeC[F[_],A](n: Int): Pipe[F,A,A] = {
 2   def go(n: Int): Stream.Handle[F,A] => Pull[F,A,Unit] = h => {
 3      if ( n <= 0 ) Pull.done
 4      else Pull.awaitLimit(n)(h).flatMap {case Step(chunk,h) =>
 5        if (chunk.size <= n) Pull.output(chunk) >> go(n-chunk.size)(h)
 6        else Pull.output(chunk.take(n)) }
 7   }
 8   sin => sin.pull(go(n))
 9 }                       //> myTakeC: [F[_], A](n: Int)fs2.Pipe[F,A,A]
10 val s1 = (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7))
11                        //> s1  : fs2.Stream[Nothing,Int] = append(append(Segment(Emit(Chunk(1, 2))), S
12 egment(Emit(Chunk(()))).flatMap(<function1>)), Segment(Emit(Chunk(()))).fla
13 tMap(<function1>))
14 s1.pure.through(myTake(4)).chunks.toList  //> res20: List[fs2.Chunk[Int]] = List(Chunk(1), Chunk(2), Chunk(3), Chunk(4))
15 s1.pure.through(myTakeC(4)).chunks.toList //> res21: List[fs2.Chunk[Int]] = List(Chunk(1, 2), Chunk(3, 4))

>>>
Process[I,O],I代表从文本中读取的本来面目数据,O代表通过筛选、处理发生的输出数据

1 s3.through(myFilter(_ % 2 == 0)).through(myTake(3)).runLog.unsafeRun
2                                                   //> res23: Vector[Int] = Vector(2, 2, 2)

再看看Topic示范:

下面我们来探视fs2的一对基本操作:

 

 

1 val p: Process[Task,Int] = emitAll(Seq(1,2,3))    
2    //> p  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))
3 
4 emitAll(Seq(1,2,3)).toSource
5    //> res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))
6                                                   

纯数据流具备了重重和List相似的操作函数:

Append[+F[_],+O]:Append是一个Process[F,O]链接类型。首先它不光负责了首位素O的传递,更关键的凡其还可管上亦然节点的F运算传到下一个节点。这样才能够在底下节点时运行对达成一个节点的下办函数(finalizer)。Append可以拿多个节点结成一个那个节点:head是率先单节点,stack是均等差函数,每个函数接受上一个节点完成状态后运算有下一个节点状态

 

Emit[O](out:
O):发送一个O类型元素;不可能进行其他附加运算

1 Stream(1,2,3).repeat.take(10).filter(_ % 2 == 0).toList
2                                   //> res15: List[Int] = List(2, 2, 2)

 

双重示范另一个Pipe的落实:take

Queue、Top和Signal都得以看作带动抱作用数据源的构建器。我们事先看看Queue是怎发数据源的:

 

下游:Sink/Channel
>>> Process[F[_],O => F[Unit]], Channel >>>
Process[F[_],I => F[O]]

Stream的类款式是:Stream[F[_],A]。从地方的例证我们见到有的F[_]还是Nothing,我们遂这样的流为纯数据流(pure
stream)。再值得注意的凡每个流构建还形成了一个Chunk,代表同节元素。fs2增加了Chunk类型来增强数据元素处理效率。这是fs2的一致起新力量。

上述且是Process0的构建方式,也总算数据源。但其只是代表了外存中的一模一样失误值,对咱们来说没什么意义,因为咱们盼望由外设获取这些价值,比如从文本或者数据库里读取数据,也就是说要F运算效果。Process0[O]
>>>
Process[Nothing,O],而我们需要的凡Process[F,O]。那么我们这么形容什么也?

 

Halt(cause:
Cause):停止发送;cause是已的原由:End-完成发送,Err-出错终止,Kill-强行终止

 

 

 

如上对可能含有副作用的Source的各种有办法提供了解释与演示。scalaz-stream的其它项目节点将当底下的讨论中深深介绍。

fs2的特长应该是多线程编程了。在Stream的类别款式中:Stream[F[_],A],F[_]凡同样栽可能来副作用的演算方式,当F[_]等于Nothing时,Stream[Nothing,A]大凡一样种纯数据流,而Stream[F[_],A]纵然是平种植运算流了。我们好于针对运算流进行状态转换的过程遭到展开演算来实现F的副作用要:数据库读写、IO操作相当。fs2不再绑定Task一种运算方式了。任何发生Catchable实例的Monad都足以成为Stream的演算方式。但是,作为同种为多线程编程为核心的家伙库,没有呀运算方式会比Task更确切了。
俺们好拿一个彻头彻尾数据流升格成运算流:

我们呢得据此timer来出Process[Task,A]:

 

 1 def getData(dbName: String): Task[String] = Task.async { cb =>
 2    import scala.concurrent._
 3    import scala.concurrent.ExecutionContext.Implicits.global
 4    import scala.util.{Success,Failure}
 5    Future { s"got data from $dbName" }.onComplete {
 6      case Success(a) => cb(a.right)
 7      case Failure(e) => cb(e.left)
 8    }
 9 }                                        //> getData: (dbName: String)scalaz.concurrent.Task[String]
10 val procGetData = eval(getData("MySQL")) //> procGetData  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@dd3b207,<function1>,<function1>)
11 procGetData.runLog.run                   //> res9: Vector[String] = Vector(got data from MySQL)

3、fs2不再一味局限于Task同种植副作用运算方式(effect)。用户可供自己之effect类型

 
 scalaz-stream库的主要设计目标是贯彻函数式的I/O编程(functional
I/O)。这样用户就是会利用效益单一的基础I/O函数组合成功能完全的I/O程序。还有一个目标即确保资源的平安采取(resource
safety):使用scalaz-stream编写的I/O程序会管资源的安康使用,特别是在完成同样桩I/O任务后活动释放具有占用的资源包括file
handle、memory等等。我们以达成一样篇之讨论里笼统地说明了一下scalaz-stream核心类型Process的中坚状况,不过多数时都为此当了介绍Process1这个通道类型。在即时篇讨论里我们见面由事实上利用之角度来介绍任何scalaz-stream链条的统筹原理和应用目的。我们涉了Process具有Emit/Await/Halt三独状态,而Append是一个链接stream节点的第一项目。先瞧就几乎单种类在scalaz-stream里的定义:

1 Stream(1,2,3).repeat.pure
2   .through(pipe.take(10))
3   .through(pipe.filter(_ % 2 == 0))
4   .toList                                         //> res14: List[Int] = List(2, 2, 2)

今天不只类型匹配,而且表达式里还包含了Task运算。我们通过Task.delay可以开展文件读取等含副作用的运算,这是为Await将会晤运作req:F[E]
>>>
Task[Int]。这多亏我们要之Source。那咱们会不能够为此这个Source来来同样失误数据为?

type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O]

def await1[F[_],I]: Handle[F,I] => Pull[F,Nothing,Step[I,Handle[F,I]]] = {...}

def receive1[F[_],I,O,R](f: Step[I,Handle[F,I]] => Pull[F,O,R]): Handle[F,I] => Pull[F,O,R] =
    _.await1.flatMap(f)

def pull[F[_],F2[_],A,B](s: Stream[F,A])(using: Handle[F,A] => Pull[F2,B,Any])(implicit S: Sub1[F,F2])
  : Stream[F2,B] =
    Pull.close { Sub1.substPull(open(s)) flatMap (h => Sub1.substPull(using(h))) }

中游:Transduce  
 >>> Process1[I,O] 

 

咱来看Process[F,O]叫包嵌在Trampoline类型里,所以Process是由此Trampoline来实现函数结构化的,可以中解决大气stream运算堆栈溢出题目(StackOverflowError)。撇开Trampoline等繁杂的语法,以上档可以简化成以下理论结构:

以上都是局部着力的List操作函数示范。

 1 import scala.concurrent._
 2   import scala.concurrent.duration._
 3   import scalaz.stream.async.mutable._
 4   import scala.concurrent.ExecutionContext.Implicits.global
 5   val sharedData: Topic[BigStringResult] = async.topic()
 6        //> sharedData  : scalaz.stream.async.mutable.Topic[demo.ws.blogStream.BigStringResult] = scalaz.stream.async.package$$anon$1@797badd3
 7   val subscriber = sharedData.subscribe.runLog    //> subscriber  : scalaz.concurrent.Task[Vector[demo.ws.blogStream.BigStringResult]] = scalaz.concurrent.Task@226a82c4
 8   val otherThread = future {
 9     subscriber.run // Added this here - now subscriber is really attached to the topic
10   }                //> otherThread  : scala.concurrent.Future[Vector[demo.ws.blogStream.BigStringResult]] = List()
11   // Need to give subscriber some time to start up.
12   // I doubt you'd do this in actual code.
13 
14   // topics seem more useful for hooking up things like
15   // sensors that produce a continual stream of data,
16 
17   // and where individual values can be dropped on
18   // floor.
19   Thread.sleep(100)
20 
21   sharedData.publishOne(longGet(1)).run // don't just call publishOne; need to run the resulting task
22   sharedData.close.run // Don't just call close; need to run the resulting task
23 
24   // Need to wait for the output
25   val result = Await.result(otherThread, Duration.Inf)
26        //> result  : Vector[demo.ws.blogStream.BigStringResult] = Vector(Some large data sets from job#1)
class Pull[+F[_],+O,+R](private[fs2] val get: Free[P[F,O]#f,Option[Either[Throwable,R]]])
 1 rait Process[+F[_],+O]
 2 case object Cause
 3 
 4 case class Emit[O](out: O) extends Process[Nothing, O] 
 5 
 6 case class Halt(cause: Cause) extends Process[Nothing,Nothing]
 7 
 8 case class Await[+F[_],E,+O](
 9   req: F[E],
10   rcv: E => Process[F,O],
11   preempt: E => Process[F,Nothing] = Halt) extends Process[F,O]
12 
13 case class Append[+F[_],+O](
14   head: Process[F,O],
15   stack: Vector[Cause => Process[F,O]]) extends Process[F,O]  

 

Process[F[_],O]:从其的种可以推论出scalaz-stream可以以输出O类型元素的长河遭到进行或者含副作用的F类型运算。

 

 1   def read(callback: (Throwable \/ Array[Byte]) => Unit): Unit = ???
 2                                  //> read: (callback: scalaz.\/[Throwable,Array[Byte]] => Unit)Unit
 3   val t: Task[Array[Byte]] = Task.async(read)     //> t  : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@1a677343
 4   val t2: Task[Array[Byte]] = for {
 5     bytes <- t
 6     moarBytes <- t
 7   } yield (bytes ++ moarBytes)          //> t2  : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@15de0b3c
 8   val prct2 = Process.eval(t2)          //> prct2  : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await(scalaz.concurrent.Task@15de0b3c,<function1>,<function1>)
 9 
10   def asyncRead(succ: Array[Byte] => Unit, fail: Throwable => Unit): Unit = ???
11                           //> asyncRead: (succ: Array[Byte] => Unit, fail: Throwable => Unit)Unit
12   val t3: Task[Array[Byte]] = Task.async { callback =>
13      asyncRead(b => callback(b.right), err => callback(err.left))
14   }                      //> t3  : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@489115ef
15   val t4: Task[Array[Byte]] = t3.flatMap(b => Task(b))
16                          //> t4  : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@3857f613
17   val prct4 = Process.eval(t4)      //> prct4  : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await(scalaz.concurrent.Task@3857f613,<function1>,<function1>)
/**
 * Chunk represents a strict, in-memory sequence of `A` values.
 */
trait Chunk[+A] { self =>
  def size: Int
  def uncons: Option[(A, Chunk[A])] =
    if (size == 0) None
    else Some(apply(0) -> drop(1))
  def apply(i: Int): A
  def copyToArray[B >: A](xs: Array[B]): Unit
  def drop(n: Int): Chunk[A]
  def take(n: Int): Chunk[A]
  def filter(f: A => Boolean): Chunk[A]
  def foldLeft[B](z: B)(f: (B,A) => B): B
  def foldRight[B](z: B)(f: (A,B) => B): B
  def indexWhere(p: A => Boolean): Option[Int] = {
    val index = iterator.indexWhere(p)
    if (index < 0) None else Some(index)
  }
  def isEmpty = size == 0
  def toArray[B >: A: ClassTag]: Array[B] = {
    val arr = new Array[B](size)
    copyToArray(arr)
    arr
  }
  def toList = foldRight(Nil: List[A])(_ :: _)
  def toVector = foldLeft(Vector.empty[A])(_ :+ _)
  def collect[B](pf: PartialFunction[A,B]): Chunk[B] = {
    val buf = new collection.mutable.ArrayBuffer[B](size)
    iterator.collect(pf).copyToBuffer(buf)
    Chunk.indexedSeq(buf)
  }
  def map[B](f: A => B): Chunk[B] = {
    val buf = new collection.mutable.ArrayBuffer[B](size)
    iterator.map(f).copyToBuffer(buf)
    Chunk.indexedSeq(buf)
  }
  def mapAccumulate[S,B](s0: S)(f: (S,A) => (S,B)): (S,Chunk[B]) = {
    val buf = new collection.mutable.ArrayBuffer[B](size)
    var s = s0
    for { c <- iterator } {
      val (newS, newC) = f(s, c)
      buf += newC
      s = newS
    }
    (s, Chunk.indexedSeq(buf))
  }
  def scanLeft[B](z: B)(f: (B, A) => B): Chunk[B] = {
    val buf = new collection.mutable.ArrayBuffer[B](size + 1)
    iterator.scanLeft(z)(f).copyToBuffer(buf)
    Chunk.indexedSeq(buf)
  }
  def iterator: Iterator[A] = new Iterator[A] {
    var i = 0
    def hasNext = i < self.size
    def next = { val result = apply(i); i += 1; result }
  }
...

一个整机的scalaz-stream由三只项目的节点组成Source(源点)/Transducer(传换点)/Sink(终点)。节点内透过Await或者Append来链接。我们再来探Source/Transducer/Sink的门类款式:

 

以上示范中我们就此await运算了Task,然后回到了Process[Task,?],一个可能带来抱作用运算的Source。实际上我们于不少情形下还要打表的源头用Task来赢得有数目,通常这些多少源都对数据获得进行了异步(asynchronous)运算处理,然后通过callback方式来供这些多少。我们得就此Task.async函数来把这些callback函数转变成Task,下一致步我们无非待因此Process.eval或者await就可以管这Task升格成Process[Task,?]。我们先看个简易的例子:假如我们之所以scala.concurrent.Future来拓展异步数据读取,可以这样将Future转换成Process:

咱得以用through来连接这些transducer:

 

implicit class HandleOps[+F[_],+A](h: Handle[F,A]) {
    def push[A2>:A](c: Chunk[A2])(implicit A2: RealSupertype[A,A2]): Handle[F,A2] =
      self.push(h: Handle[F,A2])(c)
    def push1[A2>:A](a: A2)(implicit A2: RealSupertype[A,A2]): Handle[F,A2] =
      self.push1(h: Handle[F,A2])(a)
    def #:[H](hd: H): Step[H, Handle[F,A]] = Step(hd, h)
    def await: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.await(h)
    def await1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.await1(h)
    def awaitNonempty: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = Pull.awaitNonempty(h)
    def echo1: Pull[F,A,Handle[F,A]] = Pull.echo1(h)
    def echoChunk: Pull[F,A,Handle[F,A]] = Pull.echoChunk(h)
    def peek: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.peek(h)
    def peek1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.peek1(h)
    def awaitAsync[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]):
      Pull[F2, Nothing, AsyncStep[F2,A2]] = self.awaitAsync(Sub1.substHandle(h))
    def await1Async[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]):
      Pull[F2, Nothing, AsyncStep1[F2,A2]] = self.await1Async(Sub1.substHandle(h))
    def covary[F2[_]](implicit S: Sub1[F,F2]): Handle[F2,A] = Sub1.substHandle(h)
  }

  implicit class HandleInvariantEffectOps[F[_],+A](h: Handle[F,A]) {
    def invAwait1Async[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]):
      Pull[F, Nothing, AsyncStep1[F,A2]] = self.await1Async(h)
    def invAwaitAsync[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]):
      Pull[F, Nothing, AsyncStep[F,A2]] = self.awaitAsync(h)
    def receive1[O,B](f: Step[A,Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await1.flatMap(f)
    def receive[O,B](f: Step[Chunk[A],Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await.flatMap(f)
  }

Await[+F[_],E,+O]:这个是运算流的核心Process状态。先进行F运算req,得出结果E后输入函数rcv转换到下一个Process状态,完成后行preempt这个以后清理函数。这不就是是单flatMap函数结构版嘛。值得注意的是E类型是独里头类型,由F运算产生后输入rcv后哪怕不再援了。我们好在preepmt函数里开展资源自由。如果我们要构建一个运算流,看来就是只有应用此Await类型了

 

咱得以就此一个文本处理流程来描述完整scalaz-stream链条的作用:

1 (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7)).chunks.toList
2     //> res16: List[fs2.Chunk[Int]] = List(Chunk(1, 2), Chunk(3, 4, 5), Chunk(6, 7))

 

如上的throughPure等于是through
+
pure。Pure是从未任何作用的F[_],是特别为扶持compiler进行项目推导的色。其实我们可就此pure先把纯数据流升格后再次就此through:

种也相当了,但表达式Emit(…)里没另外Task的影子,这个无法满足我们对Source的要。看来只有以下这种措施了:

我们打Pull里用await1或者receive1把一个Step数据结构从Handle里扯(pull)出来然后再output到Pull结构里。把此Pull
close后取我们要之Stream。我们管例子使用的路及函数款式陈列于底下:

>>> O =>
F[Unit]是一个非返结果的函数,代表针对输入的O类型数据开展F运算,如将O类型数据存写入一个文件

1 Stream.emits(Seq(1,2,3)).toList        //> res3: List[Int] = List(1, 2, 3)
2 Stream.emits(Seq(1,2,3)).toVector      //> res4: Vector[Int] = Vector(1, 2, 3)

 

1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task]    //> s2  : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3)))
2 val t2 = s2.runLog                                //> t2  : fs2.Task[Vector[Int]] = Task
3 t2.unsafeRun                                      //> res22: Vector[Int] = Vector(1, 2, 3)

上述流程省略描述:从文本被读来数->加工处理读来多少->写副另外一个文本。虽然从叙上看起颇简单,但咱的目的是资源安全用:无论以任何终止情形下:正常读写、中途强行终止、出错终止,scalaz-stream都见面主动关闭开启之文本、停止使用的线程、释放占用的内存等另外资源。这样看过来不是那么简单了。我们先试着分析Source/Transducer/Sink这几栽类型的作用:

 

Process[F[_],O],用F[O]方式读取文件被的O值,这时F是发副作用的 

 

咱来验证一下:

 

1 import Process._
2 emit(0)                        //> res0: scalaz.stream.Process0[Int] = Emit(Vector(0))
3 emitAll(Seq(1,2,3))            //> res1: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3))
4 Process(1,2,3)                 //> res2: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3))
5 Process()                      //> res3: scalaz.stream.Process0[Nothing] = Emit(List())
1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task]    //> s2  : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3)))

上游:Source      
>>> Process0[O]   >>> Process[F[_],O]

这儿compiler不再出错误信息了。在fs2
pipe对象里之函数通过措施注入或者项目继承变成了Stream的自函数,所以我们为堪直接当Stream类型上运用这些transducer:

倘直接以scalaz
Task callback的门类款式 def async(callback:(Throwable \/ Unit) =>
Unit):

  • type Pipe[F,A,B] = Stream[F,A] => Stream[F,B]
  • type Pipe2[F,A,B,C] = (Stream[F,A], Stream[F,B]) => Stream[F,C]
  • Pipe 替代了 Channel 和 Process1 
  • Pipe2 替代了 Tee 和 Wye

 

fs2是本Stream的构建批次来分省之。我们来演示一下如何用Pull的Chunk机制:

 1   import com.ning.http.client._
 2   val asyncHttpClient = new AsyncHttpClient()     //> asyncHttpClient  : com.ning.http.client.AsyncHttpClient = com.ning.http.client.AsyncHttpClient@245b4bdc
 3   def get(s: String): Task[Response] = Task.async[Response] { callback =>
 4     asyncHttpClient.prepareGet(s).execute(
 5       new AsyncCompletionHandler[Unit] {
 6         def onCompleted(r: Response): Unit = callback(r.right)
 7 
 8         def onError(e: Throwable): Unit = callback(e.left)
 9       }
10     )
11   }                 //> get: (s: String)scalaz.concurrent.Task[com.ning.http.client.Response]
12   val prcGet = Process.eval(get("http://sina.com"))
13                     //> prcGet  : scalaz.stream.Process[scalaz.concurrent.Task,com.ning.http.client.Response] = Await(scalaz.concurrent.Task@222545dc,<function1>,<function1>)
14   prcGet.run.run    //> 12:25:27.852 [New I/O worker #1] DEBUG c.n.h.c.p.n.r.NettyConnectListener -Request using non cached Channel '[id: 0x23fa1307, /192.168.200.3:50569 =>sina.com/66.102.251.33:80]':

咱在前提到了fs2使用了崭新的法及数据类型来落实transducer。transducer的档次是Pipe,即:

 

 

myTake和myTakeC产生了不同之结果。

咱俩曾提过fs2功能升级的内同样宗是搭了节组(Chunk)数据类型和有关的操作函数。Chunk是fs2内部以的同等种植集合,这样fs2就可等效节约一样节约(by
chunks)来拍卖数据了。Chunk本身持有了整机的会师函数:

 

1 Stream(1,2,3).repeat
2   .throughPure(pipe.take(10))
3   .throughPure(pipe.filter(_ % 2 == 0))
4   .toList                                    //> res13: List[Int] = List(2, 2, 2)

 

2、流元素增加了节组(chunk)类型以及系的操作方法

1 Stream()                       //> res0: fs2.Stream[Nothing,Nothing] = Segment(Emit(Chunk()))
2 Stream(1,2,3)                  //> res1: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(1, 2, 3)))
3 Stream.emit(4)                 //> res2: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(4)))
4 Stream.emits(Seq(1,2,3))       //> res3: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(1, 2, 3)))

咱俩来看Pipe就是一个Function1之类别别名,一个lambda:提供一个Stream[F,I],返回Stream[F,O]。那么在fs2里是哪些读取一个Stream[F,I]里的因素呢?我们前提到是经过一个初的数据结构Pull来贯彻的,先来探视fs2凡是怎落实Stream
>> Pull >> Stream转换的:

 

 

我们可以据此toList或者toVector来运算纯数据流中的元素值:

1、完全无带有其他外部依赖(third-party
dependency)

 

1 def myTake[F[_],A](n: Int): Pipe[F,A,A] = {
2    def go(n: Int): Stream.Handle[F,A] => Pull[F,A,Unit] = h => {
3       if (n <= 0) Pull.done
4       else h.receive1 { case a #: h => Pull.output1(a).flatMap{_ => go(n-1)(h)}}
5    }
6    sin => sin.pull(go(n))
7 }                                                 //> myTake: [F[_], A](n: Int)fs2.Pipe[F,A,A]
8 Stream.range(0,10).pure.through(myTake(3)).toList //> res18: List[Int] = List(0, 1, 2)
 1 import scala.language.higherKinds
 2 def myFilter[F[_],A](f: A => Boolean): Pipe[F, A, A] = {
 3   def go(h: Stream.Handle[F,A]): Pull[F,A,Unit] =  {
 4 //      h.receive1 {case Step(a,h) => if(f(a)) Pull.output1(a) >> go(h) else go(h)}
 5        h.await1.flatMap { case Step(a,h) => if(f(a)) Pull.output1(a) >> go(h) else go(h)}
 6   }
 7 //  sin => sin.open.flatMap {h => go(h)}.close
 8   sin => sin.pull(go _)
 9 }                                   //> myFilter: [F[_], A](f: A => Boolean)fs2.Pipe[F,A,A]
10 
11 Stream.range(0,10).pure.through(myFilter(_ % 2 == 0)).toList
12                                      //> res17: List[Int] = List(0, 2, 4, 6, 8)

果真在Handle提供的函数里发出await,receive等这些读取函数。我们试试着来贯彻一个简单的transducer:一个filter函数:

 

 1 object pipe {
 2 ...
 3 /** Drop `n` elements of the input, then echo the rest. */
 4   def drop[F[_],I](n: Long): Stream[F,I] => Stream[F,I] =
 5     _ pull (h => Pull.drop(n)(h) flatMap Pull.echo)
 6 ...
 7 /** Emits `true` as soon as a matching element is received, else `false` if no input matches */
 8   def exists[F[_], I](p: I => Boolean): Stream[F, I] => Stream[F, Boolean] =
 9     _ pull { h => Pull.forall[F,I](!p(_))(h) flatMap { i => Pull.output1(!i) }}
10 
11   /** Emit only inputs which match the supplied predicate. */
12   def filter[F[_], I](f: I => Boolean): Stream[F,I] => Stream[F,I] =
13     mapChunks(_ filter f)
14 
15   /** Emits the first input (if any) which matches the supplied predicate, to the output of the returned `Pull` */
16   def find[F[_],I](f: I => Boolean): Stream[F,I] => Stream[F,I] =
17     _ pull { h => Pull.find(f)(h).flatMap { case o #: h => Pull.output1(o) }}
18 
19 
20   /**
21    * Folds all inputs using an initial value `z` and supplied binary operator,
22    * and emits a single element stream.
23    */
24   def fold[F[_],I,O](z: O)(f: (O, I) => O): Stream[F,I] => Stream[F,O] =
25     _ pull { h => Pull.fold(z)(f)(h).flatMap(Pull.output1) }
26 ...
27 /** Emits all elements of the input except the first one. */
28   def tail[F[_],I]: Stream[F,I] => Stream[F,I] =
29     drop(1)
30 
31   /** Emit the first `n` elements of the input `Handle` and return the new `Handle`. */
32   def take[F[_],I](n: Long): Stream[F,I] => Stream[F,I] =
33     _ pull Pull.take(n)
34 ...
 1 def stdOut: Sink[Task,String]  =
 2   _.evalMap { x => Task.delay{ println(s"milli: $x")}}
 3                                                   //> stdOut: => fs2.Sink[fs2.Task,String]
 4 Stream.repeatEval(Task.delay{System.currentTimeMillis})
 5   .map(_.toString)
 6   .through(myTake(3))
 7   .to(stdOut)
 8   .run.unsafeRun                                  //> milli: 1472001934708
 9                                                   //| milli: 1472001934714
10                                                   //| milli: 1472001934714

 

在Pull的花色参数中F是一个运算,O代表输出元素类型,R代表Pull里之数资源。我们好从R读取元素。在上面的例子里pll的R值是个Handle类型。这个类别里当提供了读取元素的章程:

咱俩明白,纯数据流就是scalaz-stream里的Process1,即transducer,是承受对流动进行状态转换的。在fs2里transducer就是Pipe(也是channel),我们一般用through来连接transducer。上面示范中之take,filter等还是transducer,我们可以当object
pipe里找到这些函数:

1 val pll = Stream(1,2,3).pure.open    //> pll  : fs2.Pull[fs2.Pure,Nothing,fs2.Stream.Handle[fs2.Pure,Int]] = fs2.Pull
2 de5031f
3 val strm = pll.close                 //> strm  : fs2.Stream[fs2.Pure,Nothing] = evalScope(Scope(Bind(Eval(Snapshot),<
4 function1>))).flatMap(<function1>)

脚的事例里显示了fs2的运算流从源头(Source)到传换(Transducer)一直顶巅峰(Sink)的采取示范:

7、stream状态转换采用了崭新的兑现方式,使用了新的数据结构:Pull

 

fs2的多数更换函数都考虑了针对性Chunk数据的拍卖体制。我们先瞧fs2凡如何表现Chunk数据的:

 

 

 

而今动myTake和myFilter就无欲pure升格了:

4、更简明的流转换组件(stream
transformation primitives)

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图