2015/08/19

[ScaVa->Scala] Scalaz Task 取代Scala Future來進行非同步處理的另一個選擇

[ScaVa->Scala] Scalaz Task 取代Scala Future來進行非同步處理的另一個選擇

[ScaVa->Scala] Scalaz Task 取代Scala Future來進行非同步處理的另一個選擇

這篇同步發佈在我的BlogGist

Scala的Future用起來有什麼問題?

在Scala中,我們要處理asynchronous computations的時候應該都知道有Future這個好用的東西,但Future monad常讓我在進行error handling的時候有點困擾,我本來的回傳值若是有”成功”或”失敗”兩種case的時候,我們可以用Scalaz的disjuction或是Scalactic的Or來處理,但是加上Future後的結果會變成:

// Scalaz Disjunction
getName1() : Future[Exception \/ String]
res: Future[scalaz.\/[Exception,String]]

// Scalactic
getName2() : Future[String Or One[Exception]]
res: Future[org.scalactic.Or[String,org.scalactic.Every[Exception]]]

這樣有什麼問題呢?
- 本來在Future中執行的computation,有可能有”成功”或”失敗”(就是Disjunction或Or所傳回來的)
- Future的處理過程,也有可能有”成功”或”失敗”

// Scalaz
getName1().onComplete{ resp =>
    resp match {
        case Success(nameOrError) => nameOrError match {
            case Right(name) => name
            case Left(ex) => // handle error from computation
        }
        case Failure(ex) => // handle error from future process
    }
}

// Scalactic
getName2().onComplete{ resp =>
    resp match {
        case Success(nameOrError) => nameOrError match {
            case Good(name) => name
            case Bad(One(ex)) => // handle error from computation
        }
        case Failure(ex) => // handle error from future process
    }
}

這樣在處理的時候,要分兩層去處理,先去處理Future的失敗狀況,再去處理拿到的值裡面的失敗狀況。

Scalaz Task

Scalaz的concurrent模組 (不是包在Scalaz的core哦! 所以要使用的要另外載入這個模組) 有提供Task這個好用的東西,他可以取代Future並提供一些比Future還好用的東西。

底下是一些Scalaz Task中的特性,我稍微列幾點,其它的請參考 Scalaz Task - the missing documentation

Task.apply

def getName(fail:Boolean) : Task[String] = {
    Task{ fail ? {throw new Exception("error")} | "joe" }
}

getName(false).attemptRun
res: scalaz.\/[Throwable,String] = \/-(joe)

getName(false).attemptRun.getOrElse("noName")
res: String = joe

這裡的Task跟Future有點不一樣的是,在apply的時候,Task是lazy的,也就是你必須使用run或是attemptRun之類的,才會讓他真的去執行,但是run就跟Option的get一樣,是危險的! 不建議這樣直接使用。

Task.nowTask.fail

Task.now("joe")
res: scalaz.concurrent.Task[String]

這個應該就是applicative的pure吧,直接把native型態的值,轉成Functor型態的值。

Task.fail(new Exception("err"))
res: scalaz.concurrent.Task[Nothing]

這裡要注意的是,fail這裡定義的回傳型態就是Task[Nothing],所以如果你要指定某個型態的Task的話,可以用下面這種方式來試。

Task[String](throw new Exception("err"))
res: scalaz.concurrent.Task[String]

另外,Task可以使用or來串接多個Task,而他的邏輯就是or,也就是有值的話就會回傳值,若通通都是錯誤的話,他會回傳最後一個Exception。

Task.fail(new Exception("err")) or Task.now("joe")
res: scalaz.concurrent.Task[String]

Task.async

async是幫你把callback API包成monadic API,讓你在處理一些callback API的時候還能functional programming啊~

  def async[A](register: ((Throwable \/ A) => Unit) => Unit): Task[A] =
    new Task(Future.async(register))

這裡可以看到,async吃一個命名為register的function,這個register function傳入的值是另一個匿名的function,然後回傳型態是Unit;這個匿名的function傳入型態是Throwable \/ A,他的回傳也是Unit。

這東西真的很棒,我們有時難免會使用到Java開發的Library,而總是會有一些callback的介面要去實作,透過async包起來之後,就可以再享受到Monad的好處。

以下的範例,是來自Scalaz Task - the missing documentation,這裡以Java的AsyncHttpClient為例,告訴我們如果包裝一個callback API成為Task,進而就可以使用Monad的flatMap這樣的function。

asyncHttp.prepareGet("http://google.com", new AsyncCompletionHandler[Response] {
  def onComplete(r: Response) = ...
  def onError(e: Throwable) = ...
})

def get(s: String) = 
  Task.async(k => asyncHttp.prepareGet(s, toHandler(k)))

def toHandler[A](k: (Throwable \/ A)) = new AsyncCompletionHandler[A] {
  def onComplete(r: Response) = k(...)
  def onError(e: Throwable) = k(...)
}

// usage:
get("http://google.com").flatMap(response => ...)

Task.gatherUnordered, Task.reduceUnordered

gatherUnorderedreduceUnordered針對回傳型態是List[T]的implementation,而gatherUnordered就像是Scala Future的Future.traverseFuture.sequence,他能幫你將List of Tasks的回傳值給合併起來,例如

val tasks = (1 |-> 5).map(n => Task{n})
Task.gatherUnordered(tasks).run
res: List[Int] = List(1,3,4,2,5)
// 注意回傳的結果不是照順序的,因為是平行處理的

reduceUnordered就是當你回傳的結果不是List而是要其他的型態的時候,可以自己implement。

Task.onFinish

這個onFinish的回傳值是Option[Throwable] => Task[Unit]可以讓你針對處理結束時進行handle,例如想要close一些resource等。

Task.timed

這東西很好用!就是幫你handle timeout,之前在寫Future的時候,必需使用Await來處理timeout的狀況,而且寫起來蠻麻煩的,使用timed寫起來就簡單多了,而且有錯誤的話(會丟java.util.concurrent.TimeoutException),直接在原本Task回傳的Disjunction中就可以handle了。

import scala.concurrent.duration._
Task { Thread.sleep(1000); "Yo" }.timed(100 millis).attemptRun
res: scalaz.\/[Throwable,String] = -\/(java.util.concurrent.TimeoutException)

Task.handleWith

這就像是使用onFailure來處理Scala的Future一樣,看你要針對某些類型的exception做什麼樣特別的處理。

Task[String] { throw new Exception("yo") }.handleWith{
    case Exception(ex) => // do something
}

Reference

沒有留言:

My World