2015/08/27

[ScaVa->Scala] Scalaz Stream 串流好朋友 part 2

[ScaVa->Scala] Scalaz Stream 串流好朋友 part 2

[ScaVa->Scala] Scalaz Stream 串流好朋友 part 2

這篇同步發佈在我的BlogGist

前情提要

請看[ScaVa->Scala] Scalaz Stream 串流好朋友 part 1Gist版本

怎麼使用Process?

使用Monad的功能map, flatMap

首先,Process是Monad,所以Monad使用上的特性也是Process好用的特色,因此map, flatMap…等這些Moand的特性都能使用。

case class Article(id:String,title:String,body:String)
val articles : Process[Task,Article] = Process(Article("1","Hi","Scalaz Process"))
//articles: scalaz.stream.Process[scalaz.concurrent.Task,Article] = Emit(WrappedArray(Article(1,Hi,Scalaz Process)))

scala> articles.map{println(_)}
Article(1,Hi,Scalaz Process)
//scalaz.stream.Process[scalaz.concurrent.Task,Unit] = Emit(Vector(()))

scala> articles.flatMap{ a=> Process(a.id) }.runLog.run
//IndexedSeq[String] = Vector(1)

Process.scan

在Stream的處理中,蠻常有機會是會要參考到前一個值,也就是causal function,也就是以之前的值與目前的值來處理的function,Process.scan就是Process中實作的causal function,他的使用方式就像是fold一樣,我們會定義一個function,傳入兩個參數,分別是之前運算完的結果,和當下的值。

val nums : Process[Task,Int] = Process.range(0,10)
// nums.runLog.run
// IndexedSeq[Int] = Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

val sum = nums.scan(0)( (past,present) => past+present )
// sum.runLog.run
//IndexedSeq[Int] = Vector(0, 0, 1, 3, 6, 10, 15, 21, 28, 36, 45)

先代入起始值0scan當作第一個past,然後scan會將 (0,0), (0,1), (1,2), (3,3), (6,4)…分別代入(past,present) => past+present這個function中,所以最後的值就會是第一個0與所有的0到9的總合。

將Stream結合起來的好工具 - wye

要將Stream結合起來的作法有很多,這裡介紹一個叫作wye的工具,他裡面實作了許多能將Stream結合的各種function,像是merge, either, yip(這就像是zip一樣), unboundedQueue, boundedQueue, timedQueue

scala> val p1 : Process[Task,String] = Process("1","2","3","4")
scala> val p2 : Process[Task,String] = Process("a","b","c","d")

scala> p1.wye(p2)(wye.merge).runLog.run
res: IndexedSeq[String] = Vector(1, 2, 3, 4, a, b, c, d)

scala> p1.wye(p2)(wye.either).runLog.run
res: IndexedSeq[scalaz.\/[String,String]] = Vector(\/-(a), \/-(b), \/-(c), \/-(d), -\/(1), -\/(2), -\/(3), -\/(4))

scala> p1.wye(p2)(wye.yip).runLog.run
res: IndexedSeq[(String, String)] = Vector((1,a), (2,b), (3,c), (4,d))

其中有一個很實用的是dynamic,他的interface是def dynamic[I,I2](f: I => wye.Request, g: I2 => wye.Request): Wye[I,I2,ReceiveY[I,I2]],他讓你去決定當收到 左邊 或 右邊 的stream傳入的值的時候,要怎麼去處理,並且最後決定下一個是要拿 左邊(wye.Request.L) 或是 右邊(wye.Request.R)的。

// define the process logic for wye.dynamic, here is always change to the other side.
val w = wye.dynamic( (_:Any) => wye.Request.R, (_:Any) => wye.Request.L)

scala> p1.wye(p2)(w).runLog.run
res: IndexedSeq[scalaz.stream.ReceiveY[Any,Any]] = Vector(ReceiveL(1), ReceiveR(a), ReceiveL(2), ReceiveR(b), ReceiveL(3), ReceiveR(c), ReceiveL(4), ReceiveR(
d))

p1.wye(p2)(w).runLog.run.filter( _.isR )
res: IndexedSeq[scalaz.stream.ReceiveY[Any,Any]] = Vector(ReceiveR(a), ReceiveR(b), ReceiveR(c), ReceiveR(d))

scala> p1.wye(p2)(w).runLog.run.map(_ match {
     |       case ReceiveY.ReceiveR(x) => "R:"+x
     |       case ReceiveY.ReceiveL(x) => "L:"+x
     | }).map(x=>println(s"got $x"))

got L:1
got R:a
got L:2
got R:b
got L:3
got R:c
got L:4
got R:d

至於其他的unboundedQueue, boundedQueue, timedQueue…等別的用法,就請參考wye的原始碼嘍!

Reference

2015/08/26

[ScaVa->Scala] Scalaz Stream 串流好朋友 part 1

[ScaVa->Scala] Scalaz Stream 串流好朋友 part 1

[ScaVa->Scala] Scalaz Stream 串流好朋友 part 1

這篇同步發佈在我的BlogGist

何時用到Stream(串流)?

一提到Stream(串流),第一個想到的就是影音的類型,這種應用面是非常的普遍也很容易理解。

而我第一次在project中想嘗試使用Streaming的時候,一直有個困擾,我的protocol是HTTP+JSON,我就是要等到整個payload都收下來了,才能Deserialize成Json object啊,這樣stream對我的好處是什麼?

前陣子有同事提到一個例子,說Java8的Performance比Scala好很多!
仔細發現,其實他在測試時用到的是Java8的Stream,但是卻拿Scala一般的Collection來比,就好比下面這樣的結果。(最後當然是幫Scala版本加個toStream兩個的差距就差不多了。)

def measure(f:()=>Any)={
  val s=System.currentTimeMillis
  f()
  println(s"elapsed: ${System.currentTimeMillis-s}ms")
}

scala> measure{ () => (1 to 20000000).toStream.filter(_%2==0).filter(_>200).head }
elapsed: 1ms

scala> measure{ () => (1 to 20000000).filter(_%2==0).filter(_>200).head }
elapsed: 983ms

一看就知道這是一個不公平的結果!!! 一個是乖乖把所有的結果整理完,然後往後傳,通通filter過之後,只取第一個結果,另一個是streaming(串流)的方式,把資料用”流”的方式一步步的流過filter與head,當第一個match的結果出現之後就結束了。

但這個例子讓我們知道,原來stream除了從合適的protocol處理就能有幫助之外,其實就算在系統內部的資料傳遞與處理的過程中,在適當的使用下,一樣是能提升效率的。

Streaming的選擇

最近Streaming的處理方式愈來愈紅,Akka在上個月(2015.07)也正式Release Akka Streams & Http Experimental 1.0,而Twitter Util也有提供AsyncStream,結合在Finatra上使用,看起來也不錯!

不過今天主角是Scalaz Stream,主要參考這篇Scalaz Stream - a Functional Reactive Programming Tutorial

以下來簡單的介紹一下Scalaz Stream的用法

Scalaz Stream

什麼是Process?

Scalaz Stream中,最重要的核心物件就是Process,這個Process[F[_], X]主要是定義了串流中的物件為X型態,並且透過F[_]這個Monad來包裝他,而在實際的使用上,通常會使用Scalaz的Task (請參考我上一篇寫的[ScaVa->Scala] Scalaz Task 取代Scala Future來進行非同步處理的另一個選擇)。

Process提供了一些run的方式,並且透過F[_]這個effect system來包裝他,可以想像F[_]是要處理這些X型態值的driver。

trait Process[F[_], X] {
  ...
  def run(implicit m: Monad[F]): F[Unit]
  def runLast(implicit m: Monad[F]): F[X]
  def runLog(implicit m: Monad[F]): F[IndexedSeq[X]]
}

在使用上的觀念要知道Process是個Stream,所以他的值會是一直”流”進來的,當使用run的時候,他回傳的是F[Unit],所以他是忽略你的回傳值,而主要的使用是用F[_]的特性來處理這些值,例如使用Task時,就可以使用他的timed, handleWith, onFinished之類的function。

而使用runLast的時候,會執行整個串流,一直到最後一個值並只回傳最後一個值。

使用runLog會將整所有的值都包在IndexedSeq[X]裡面,讓你可以一口氣取得所有串流中的值,這也是最耗記憶體的,就是把所有的結果都收集起來。

若以Listmonad來當作F[_],就會像以下這樣的例子:

scala> val p: Process[List,Int] = Process.range(0,10)

scala> p.run
res5: List[Unit] = List(())

scala> p.runLast
res10: List[Option[Int]] = List(Some(9))

scala> p.runLog
res11: List[IndexedSeq[Int]] = List(Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))

產生Process

這裡舉幾種產生Process方式的例子:

1. 要把現有的值包成Process

val p = Process(1,2,3,4,5)
//p: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3, 4, 5))

如果你直接嘗試去run這個p,會得到以下的錯誤:

scala> p.run
<console>:16: error: could not find implicit value for parameter C: scalaz.Catchable[F2]
       p.run
         ^

因為他缺少了driver,也就是F[_],所以他不知道該怎麼去處理他,這時候他的型態是Process0,只是個wrapper,所以你可以直接toList取得你要的值,或是告訴他你的driver是什麼。

val p = Process(1,2,3,4,5):Process[Task,Int]
//p: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(WrappedArray(1, 2, 3, 4, 5))
p.runLog.run
//IndexedSeq[Int] = Vector(1, 2, 3, 4, 5)

2. 使用scalaz.stream.io來產生

scalaz.stream.io有將一些io的操作包裝起來,讓你方便直接使用,例如讀檔案:

val lns: Process[Task,String] = io.linesR("inputfile.txt")

3. 使用Task.async來包裝你的執行動作,再透過Process.eval來產生

這種用法,很適合將你本來要進行的處理(一些java/scala library的存取、或是對server的request)給包成Task,再透過Process來將這裡面的結果包成Stream。

val f: Task[A] = Task.async { ... } //Do something to create an object
val evalF: Process[Task,A] = Process.eval(f)

這類型的有eval,repeat,repeatEval,suspend等…

4. 使用scalaz.stream.async.Queue

scalaz.stream實作了Queue來讓你從一邊把串流的值給不斷的塞進去,而另一邊可以直接以Process來接收這些串流的值。

import scalaz.stream._

val q = async.unboundedQueue[String]

while (true) {
  val inputString = readFromTheNetwork()
  q.enqueueOne( inputString ).run
}

//...elsewhere...

val stringsFromTheNetwork: Process[Task,String] = q.dequeue

5. 另外一種方式 - signal

這裡的signalqueue不同的地方是,這個的implementation是針對signal,所以他保證你能得到最後的值,但中間過程中若有許多次的變動,不一定每次都會拿得到,若你是希望每一個過程都要拿得到的話,要使用queue

val signal = async.signal[Boolean]
val signalChanges: Process[Task,Boolean] = signal.discrete

//Thread 1
signal.set(true).run // Time = 1
signal.set(true).run // Time = 2
signal.set(false).run //Time = 3
...
//Thread 2
signalChanges.map(x => {
  println("" + x + " -> " + System.currentTimeMillis)
  }).run.run
// Will print:
// true -> 1
// false -> 3

6. Process.awakeEvery定時產生Process的通知

import scala.concurrent.duration._
val clock = Process.awakeEvery(1 seconds)

Nest

[ScaVa->Scala] Scalaz Stream 串流好朋友 part 2 BlogGist

Reference

2015/08/24

情人節 台中三日遊

這次情人節,特別請了兩天假(星期四和五)避開人潮,與小柔到台中放鬆一下,玩個三天兩夜。
本來有考慮玩到四天,不過因為有颱風要來(雖然最後好像只是擦身而過),所以最後只有三天兩夜。
這次的行程都是小柔幫忙找的,其實找了很多個點,也建了Google MyMap,但因為太隨興了,很多點沒有跑到,辛苦小柔了。

Day 1 紙箱王 -> 碧根行館 -> 逢甲夜市

第一天我們大約中午從台北出發,先買了簡單的早餐在車上吃,接下來就直接出發到我們的第一個景點-紙箱王(其實也跑錯了,跑到了他的餐廳,但還是有蠻多很特別的紙箱製的作品)


再來旁邊剛好有大魯閣,跑去上個廁所又順便玩了一下遊樂設施!


算算時間差不多可以check-in了,我們就直接到碧根行館,其實本來想換到Hotel 7,但是是透過Booking.com,而免費取消的時間過了,只好住這間了。我們是Booking.com的特惠價格專案,一晚1999元,所以飯店跟我們說沒有附車位,只好花200元停在他們的特約停車場,大約走路7~8分鐘。(就在Hotel 7旁邊)

其實碧根行館就在逢甲大門口,夜市裡面! 我們逛一逛還可以先回去休息一下放個東西再繼續出來逛,地點真的滿分沒話說!


這是從房間窗戶往外面的畫面,對面就是i-bike站!

晚上就是瘋逛的逛逢甲夜市了!

這棉花糖做的有夠醜!

逢甲夜市的小吃們!這個雞排真的很好吃耶!


不小心逛到一間拼圖店,1000片的拉拉熊拼圖居然只要500元,加上木框999元,當然就直接買了!(這時候飯店近的優點馬上體會到,不用走太遠就可以把東西拿回去放。)


Day2 無為草堂 -> 精明一街 -> 輕井澤 -> 沐蘭

第二天check-out的時候也差不多中午了,我們就直接出到去無為草堂吃午餐。


餐點很好吃! 五穀米也很健康的感覺!


裡面的風格真的很讚,可以花20元跟櫃台買一大杯的魚飼料,小柔餵魚餵得很開心!


精明一街沒想像中的好逛,而且我出來玩的時候,又因為有一些事在處理,很常時間用手機,還跟學長通電話講了好一陣子,委曲小柔在旁邊等我。


小柔找到一間自己手作點心的烘焙教室,我們都很有興趣,但因為時間不夠,所以沒辦法體驗,只好下次再找機會來。


晚上跟以前同學碰面吃飯,超久沒聯絡的修哥,謝謝他請我們吃輕井澤,聊了蠻多,期待下次的聚會。

吃完飯後,就去沐蘭Check-in,之前旅展買的住宿休息劵+免費升等券,入住楓舞B,星期五晚上入住要再加600元,總共3千初頭,還蠻划算的。



房間很大很漂亮!


 Day 3 宮原眼科 -> 清水財伯米糕 -> 高美濕地 -> 西湖休息站 -> 回家

這次去台中車站旁的宮原眼科,運氣很好的在旁邊找到路邊停車格 (雖然也繞了三圈,但居然在星期六的中午找到路邊停車格,Lucky!)

滿滿的人要排冰淇淋,我個人是還好,小柔那個來也不能吃冰,所以就不排了!

伴手禮的店,我們買了小柔哥哥最喜歡吃的乳酪蛋糕,剛好他今天生日,帶回去給他吃。

接下來出發往清水走,先去吃個有名的米糕!不錯!醬汁很香很夠味!



本來的計劃是小柔想要去高美濕地抓蛤蠣,但太久沒來了,不知道現在已經在進行生態保護,架了個高高的步道,人不能下去…,我們還帶了網子過去… (不過看到旁邊有人拿著路邊攤在賣的小朋友挖沙組,他應該比我們還嘔!路邊攤還在賣那個實在是詐欺啊!)


不過這次看到滿滿的螃蟹跟一群一群的魚苗,這裡生態的保護真的有做有差。
結束清水高美濕地的曬太陽行程後,就要出發回台北了,路上經過西湖休息站,因為太想睡覺,就先在這裡休息一下。

西湖休息站有讓我驚訝,覺得不比清水差啊!


有人說怎麼會有奶頭…我就進來幫奶頭餅拍了幾張照…



童心未泯的小柔!

出發前買了新東陽的熊厚呷鹽酥雞,熱的時候還不錯,但冷了就蠻難吃的(有些感覺油溫夠高的冷了還是會好吃)。


簡簡單單的三天台中行,走著非常輕鬆的行程,但感覺很不滿足,覺得不久之後應該又要再來一趟了!





My World