[ScaVa->Scala] Scalaz Stream 串流好朋友 part 2
前情提要
請看[ScaVa->Scala] Scalaz Stream 串流好朋友 part 1或Gist版本。
怎麼使用Process?
使用Monad的功能map
, flatMap
首先,Process
是Monad,所以Monad使用上的特性也是Process
好用的特色,因此map
, flatMap
…等這些Moand的特性都能使用。
case class Article(id:String,title:String,body:String)
val articles : Process[Task,Article] = Process(Article("1","Hi","Scalaz Process"))
//articles: scalaz.stream.Process[scalaz.concurrent.Task,Article] = Emit(WrappedArray(Article(1,Hi,Scalaz Process)))
scala> articles.map{println(_)}
Article(1,Hi,Scalaz Process)
//scalaz.stream.Process[scalaz.concurrent.Task,Unit] = Emit(Vector(()))
scala> articles.flatMap{ a=> Process(a.id) }.runLog.run
//IndexedSeq[String] = Vector(1)
Process.scan
在Stream的處理中,蠻常有機會是會要參考到前一個值,也就是causal function,也就是以之前的值與目前的值來處理的function,Process.scan
就是Process中實作的causal function,他的使用方式就像是fold
一樣,我們會定義一個function,傳入兩個參數,分別是之前運算完的結果,和當下的值。
val nums : Process[Task,Int] = Process.range(0,10)
// nums.runLog.run
// IndexedSeq[Int] = Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
val sum = nums.scan(0)( (past,present) => past+present )
// sum.runLog.run
//IndexedSeq[Int] = Vector(0, 0, 1, 3, 6, 10, 15, 21, 28, 36, 45)
先代入起始值0
給scan
當作第一個past,然後scan
會將 (0,0), (0,1), (1,2), (3,3), (6,4)…分別代入(past,present) => past+present
這個function中,所以最後的值就會是第一個0與所有的0到9的總合。
將Stream結合起來的好工具 - wye
要將Stream結合起來的作法有很多,這裡介紹一個叫作wye
的工具,他裡面實作了許多能將Stream結合的各種function,像是merge
, either
, yip
(這就像是zip
一樣), unboundedQueue
, boundedQueue
, timedQueue
…
scala> val p1 : Process[Task,String] = Process("1","2","3","4")
scala> val p2 : Process[Task,String] = Process("a","b","c","d")
scala> p1.wye(p2)(wye.merge).runLog.run
res: IndexedSeq[String] = Vector(1, 2, 3, 4, a, b, c, d)
scala> p1.wye(p2)(wye.either).runLog.run
res: IndexedSeq[scalaz.\/[String,String]] = Vector(\/-(a), \/-(b), \/-(c), \/-(d), -\/(1), -\/(2), -\/(3), -\/(4))
scala> p1.wye(p2)(wye.yip).runLog.run
res: IndexedSeq[(String, String)] = Vector((1,a), (2,b), (3,c), (4,d))
其中有一個很實用的是dynamic
,他的interface是def dynamic[I,I2](f: I => wye.Request, g: I2 => wye.Request): Wye[I,I2,ReceiveY[I,I2]]
,他讓你去決定當收到 左邊 或 右邊 的stream傳入的值的時候,要怎麼去處理,並且最後決定下一個是要拿 左邊(wye.Request.L
) 或是 右邊(wye.Request.R
)的。
// define the process logic for wye.dynamic, here is always change to the other side.
val w = wye.dynamic( (_:Any) => wye.Request.R, (_:Any) => wye.Request.L)
scala> p1.wye(p2)(w).runLog.run
res: IndexedSeq[scalaz.stream.ReceiveY[Any,Any]] = Vector(ReceiveL(1), ReceiveR(a), ReceiveL(2), ReceiveR(b), ReceiveL(3), ReceiveR(c), ReceiveL(4), ReceiveR(
d))
p1.wye(p2)(w).runLog.run.filter( _.isR )
res: IndexedSeq[scalaz.stream.ReceiveY[Any,Any]] = Vector(ReceiveR(a), ReceiveR(b), ReceiveR(c), ReceiveR(d))
scala> p1.wye(p2)(w).runLog.run.map(_ match {
| case ReceiveY.ReceiveR(x) => "R:"+x
| case ReceiveY.ReceiveL(x) => "L:"+x
| }).map(x=>println(s"got $x"))
got L:1
got R:a
got L:2
got R:b
got L:3
got R:c
got L:4
got R:d
至於其他的unboundedQueue
, boundedQueue
, timedQueue
…等別的用法,就請參考wye
的原始碼嘍!