Scala notes - Futures - 3 (Combinators and Async)

In the previous parts of this post, we discussed about Futures and Promises. In this last part, we'll compose Futures using its powerful combinators.

Composing Futures :

In the first post, we saw how to extract a value from Future using onComplete, foreach and in testcases using Await.result. Extracting a value from a single Future is good but many a time we spawn more than one asynchronous operation and wait on multiple Futures to arrive at the final result. Even better, sometimes the result of one Future would be fed into another or a chain of Futures.

Future is a Monad. (I am sorry to drop the M-bomb here and I will take a stab at explaining my understanding on what a Monoid, Functor, Monad and an Applicative later). But for now, let's live with this crude explanation :

  1. Future is a container of a value(s) of some type (i.e it accepts a type as an argument and it can't exist without it). You can have a Future[Int] or Future[String] or Future[AwesomeClass] - you can't just have a plain Future. A fancy term for this is type-constructor. To compare, a List is a type constructor (and a Monad as well). A list is a container of values that are of type Int, String or any of other types. A List/Future without a contained type does not exist.

  2. Future has flatMap and unit functions (and consequentially a map function too).

The reason I brought this up is that instead of using the onComplete callback or the foreach, we could simply map or flatMap the result of the Future just like we would do it against an Option or a List.

Now, let's look at the map and the flatMap combinators.

Mapping Futures that execute sequentially

Let's consider this simple task, which adds three numbers which are asynchronously calculated after some interval.

Warning : The following code is messy and executes the Futures sequentially

Code
class FutureCombinators {

  def sumOfThreeNumbersSequentialMap(): Future[Int] = {
    Future {
      Thread.sleep(1000)
      1
    }.flatMap { oneValue =>
      Future {
        Thread.sleep(2000)
        2
      }.flatMap { twoValue =>
        Future {
          Thread.sleep(3000)
          3
        }.map { thirdValue =>
          oneValue + twoValue + thirdValue
        }
      }
    }
  }
...
...

The first Future returns a 1 after 1 second, the second Future returns a 2 after 2 seconds and the third Future returns a 3 after 3 seconds. The nested block finally calculates the sum of three values and returns one single Future[Int].

Testcase

For the sake of calculating the time taken to compute the values, we have a small utility function (inside the ConcurrentUtils trait) called timed which calculates and prints the time taken by a block.

We Await.result to do a blocking wait on the result of futureCombinators.sumOfThreeNumbersSequentialMap. We also time the total execution and print it.

class FutureCombinatorsTest extends FunSpec with Matchers with ConcurrentUtils {

  describe("Futures") {
    it("could be composed using map") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialMap(), 7 seconds))
      result shouldBe 6
    }
  }
...
...
}

trait ConcurrentUtils {
  def timed[T](block: => T): T = {
    val start = System.currentTimeMillis()
    val result = block
    val duration = System.currentTimeMillis() - start
    println(s"Time taken : $duration")
    result
  }
}

Output

Time taken : 6049

The function took a little over 6 seconds to execute which indicates that the Futures are executed in sequence.

Using for-comprehension syntactic sugar instead of Map

Scala gives a great way to work with classes that has map and flatMap (Monads) - for comprehensions. For comprehensions are just syntactic sugar, which gets de-sugared into flatMap and map.

The following code means exactly the same as the above, except that the uglification is done by the Scala compiler.

Code
  def sumOfThreeNumbersSequentialForComprehension(): Future[Int] = {
    for {
      localOne <- Future {
        Thread.sleep(1000)
        1
      }
      localTwo <- Future {
        Thread.sleep(2000)
        2
      }
      localThree <- Future {
        Thread.sleep(3000)
        3
      }
    } yield localOne + localTwo + localThree
  }
```

######Testcase

It's the same as above. 

```language-scala
  it("could be composed using for comprehensions") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialForComprehension(), 7 seconds))
      result shouldBe 6
    }
```

**Output**

```
Time taken : 6012
```

#####Executing Futures in parallel

As we saw, the previous block of code runs the three Futures sequentially and therefore takes a total of 6 seconds to finish the computation. That isn't good. Our Futures need to run in parallel.  In order to achieve this, all we need to do is to extract the Future block out and declare them separately. 

######Code 

```language-scala

 val oneFuture: Future[Int] = Future {
    Thread.sleep(1000)
    1
  }

  val twoFuture: Future[Int] = Future {
    Thread.sleep(2000)
    2
  }

  val threeFuture: Future[Int] = Future {
    Thread.sleep(3000)
    3
  }
```

Now, let's use for-comprehension to calculate the value. 

```language-scala
def sumOfThreeNumbersParallelMapForComprehension(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture
    threeValue <- threeFuture
} yield oneValue + twoValue + threeValue

```

######Testcase

Let's time the computation and assert the correct value using the following testcase.

```language-scala
 describe("Futures that are executed in parallel") {
    it("could be composed using for comprehensions") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallel(), 4 seconds))
      result shouldBe 6
    }
  }
```

**Output**

```
Time taken : 3005
```

As we see, that `sumOfThreeNumbersParallel` takes almost the same time as the longest Future (`threeFuture`), which is 3 seconds. 

Just for the sake of comparison, the above code could be written without using for-comprehension as : 

```language-scala

def sumOfThreeNumbersParallelMap(): Future[Int] = oneFuture.flatMap { oneValue =>
    twoFuture.flatMap { twoValue =>
      threeFuture.map { threeValue =>
        oneValue + twoValue + threeValue
      }
    }
}

```

#####Guards in for-comprehensions

Just like we add a guarded `if` clause in for-comprehensions over `List` and other collections (aka other Monadic types), we could add guards against the generators of `Future` as well.  The following `if` guard checks if the value returned by the `twoFuture` is more than 1, which it is. 


```language-scala
  def sumOfThreeNumbersParallelWithGuard(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 1
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue
```


These guards get de-sugared as `withFilter` like so (I am 90% sure nobody wants to write this way): 

```language-scala
def sumOfThreeNumbersMapAndFlatMapWithFilter(): Future[Int] = oneFuture.flatMap { oneValue =>
    twoFuture.withFilter(_ > 1).flatMap { twoValue =>
      threeFuture.map { threeValue =>
        oneValue + twoValue + threeValue
      }
    }
  }
```  


#####Guards in for-comprehensions - Failure case

If the guard evaluates to false thereby the generator yielding a failure, a `NoSuchElementException` would be thrown. Let's change the guard condition to evaluate to false. 

```language-scala
  def sumOfThreeNumbersParallelWithGuardAndFailure(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 2
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue
```

**Output**

```
Future.filter predicate is not satisfied
java.util.NoSuchElementException: Future.filter predicate is not satisfied
	at scala.concurrent.Future$$anonfun$filter$1.apply(Future.scala:280)
	at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
	at scala.util.Try$.apply(Try.scala:192)
	at scala.util.Success.map(Try.scala:237)
	at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
	at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

#####Exception handling

Just like the `NoSuchElementException` thrown by the guard, code executing asynchronously inside a `Future` could throw a variety of exceptions.  While one may argue that Exceptions are not very FP-like, chances are that with a distributed application or through usage of Java libraries inside your `Future`, exceptions do happen.

######Code
Both the two functions below throw Exceptions - the first one throws a `NoSuchElementException` and the second one throws a `LegacyException`. 

```language-scala

  //NoSuchElementException
  def throwsNoSuchElementIfGuardFails(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 2
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue

  //LegacyException

  val futureCallingLegacyCode: Future[Int] = Future {
    Thread.sleep(1000)
    throw new LegacyException("Danger! Danger!")
  }

  def throwsExceptionFromComputation(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCode
  } yield oneValue + futureThrowingException

case class LegacyException(msg: String) extends Exception(msg)
```

######Testcases

```language-scala
 describe("Futures that throw exception") {
    it("could blow up on the caller code when guard fails") {
      val futureCombinators = new FutureCombinators
      intercept[NoSuchElementException] {
        val result = timed(Await.result(futureCombinators.throwsNoSuchElementIfGuardFails(), 4 seconds))
      }
    }

    it("could blow up on the caller code when exception comes from a computation executed inside the Future") {
      val futureCombinators = new FutureCombinators
      intercept[LegacyException] {
        val result = timed(Await.result(futureCombinators.throwsExceptionFromComputation(), 4 seconds))
      }
    }
...
...

```

*Note that even if one of the Futures result in an exception, the entire result of the composed computation will result in propagating the exception.*  


###Recovering from Exception :

#####Using `recover` 


If a Future throws a [`scala.util.control.NonFatal`](http://www.scala-lang.org/api/2.11.6/index.html#scala.util.control.NonFatal$) and we would want to have a default fallback value instead of propagating the error to the caller, we could use the `recover` function.  The `recover` is much like the catch block. 

Let's modify the above function `throwsExceptionFromComputation` which throws a `LegacyException`. The `recover` function accepts a `PartialFunction` that maps from `Throwable` to the type that the `Future` wraps.

######Code

In the below code, if the `futureCallingLegacyCode` throws an `Exception` (which it does), the value that is the result of this computation is set to be *200*.  If it hadn't thrown an Exception, the resulting value would be the result of that computation itself.


```language-scala
  val futureCallingLegacyCodeWithRecover: Future[Int] = futureCallingLegacyCode.recover {
    case LegacyException(msg) => 200
  }

  def recoversFromExceptionUsingRecover(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCodeWithRecover
  } yield oneValue + futureThrowingException
```


**To reiterate, if the original `Future` yields a successful value, the `recover` block is never executed.  Also, if the `PartialFunction` inside the `recover` function does not handle the exception that is originally thrown, the exception gets propagated to the caller.**


######Testcase

The testcase asserts that the result of the computation is the sum of values that are returned by `oneFuture` (which is 1) and the `futureCallingLegacyCodeWithRecover` (which is 200). 


```language-scala
   it("could be recovered with a recovery value") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecover(), 2 seconds))
      result shouldBe 201
    }
```

**Output**

```
Time taken : 1004
```

#####Using `recoverWith`

Instead of recovering with a value when a `Future` results in an `Exception`, we might want to recover with a result of some other Future in some circumstances.  Say, unavailability of a HTTP call to Server1 due to network failure could be recovered with a HTTP call to another service running on Server2. 

Similar to `recover`, the `recoverWith` accepts a `PartialFunction`.  However, the `PartialFunction` maps a `Throwable` to a `Future` of the same type as the original `Future`.

Just like `recover`, if the main `Future` on which the `recoverWith` is called fails, then the `Future` that is mapped to in the `PartialFunction` gets called. If the second future results in a successful value, then the new result is returned. 

######Code

```language-scala
 val futureCallingLegacyCodeWithRecoverWith: Future[Int] = futureCallingLegacyCode.recoverWith {
    case LegacyException(msg) =>
      println("Exception occurred. Recovering with a Future that wraps 1000")
      Thread.sleep(2000)
      Future(1000)
  }

  def recoversFromExceptionUsingRecoverWith(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCodeWithRecoverWith
  } yield oneValue + futureThrowingException

```

######Testcase

The `oneFuture` takes 1 second and the recovering Future takes 2 seconds. So, we set the `Await.result` timeout to 4 seconds.  The final result 1001 is the sum of the result of `oneFuture` and `futureCallingLegacyCodeWithRecoverWith`.


```language-scala
    it("could be recovered with a recovery Future") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWith(), 4 seconds))
      result shouldBe 1001
    }
```

**Output**
```
Time taken : 3006
```

**Note that just like the `recover`, if the second future also fails, then the error thrown by the second future gets propagated to the caller.**

######Code 

In the following code, we create another `Future` that throws an `Exception` with message **Dieded!!** and we recover the first Future with this error-throwing-Future.  The testcase would reveal that the exception from the second future (recovery one) gets thrown back to the caller. 


```language-scala
 val anotherErrorThrowingFuture: Future[Int] = Future {
    Thread.sleep(1000)
    throw new LegacyException("Dieded!!")
  }

  val futureRecoveringWithAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.recoverWith {
    case LegacyException(msg) =>
      anotherErrorThrowingFuture
  }

  def recoversFromExceptionUsingRecoverWithThatFails(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureRecoveringWithAnotherErrorThrowingFuture
  } yield oneValue + futureThrowingException

```

######Testcase

```language-scala
  it("when recovered with another Future that throws Exception would throw the error from the second Future") {
      val futureCombinators = new FutureCombinators
      val exception = intercept[LegacyException] {
        timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWithThatFails(), 4 seconds))
      }
      exception.msg shouldBe "Dieded!!"
    }
```

#####Using fallbackTo :

`fallbackTo` works just like `recoverWith` when it comes to successful value. It uses the first Future's value if it is successful or falls back to the second Future's value.  However, if both the first and the second Future fails, then the error that is propagated to the caller is that of the **first** Future and not the second Future. 


######Code

Let's use the same Futures that we used in the `recoverWith`. 



```language-scala

  val futureFallingBackToAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.fallbackTo (anotherErrorThrowingFuture)

  def recoversFromExceptionUsingFallbackTo(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureFallingBackToAnotherErrorThrowingFuture
  } yield oneValue + futureThrowingException
```

Notice that the `fallbackTo` function just accepts another `Future` and not a `PartialFunction` like `recoverWith`.

######Testcase

```

   it("when fallen back to another Future that throws Exception would throw the error from the first Future") {
      val futureCombinators = new FutureCombinators
      val exception = intercept[LegacyException] {
        timed(Await.result(futureCombinators.recoversFromExceptionUsingFallbackTo(), 4 seconds))
      }
      exception.msg shouldBe "Danger! Danger!"
    }

```


###Other interesting and useful combinators

The following is a super-brief list of other Future combinators which I find very useful. 

#####zip

`zip` works just like `List.zip`.  It just merges two Futures and yields a `Future` of a `Tuple`. 

```language-scala
def zipTwoFutures:Future[(Int,Int)]=oneFuture zip twoFuture
```

#####firstCompletedOf
Ah! The `firstCompletedOf` comes in really handy when you have two equivalent services and you want to proceed once the fastest service returns a value. 

```language-scala
  val listOfFutures=List(oneFuture,twoFuture,threeFuture)
  def getFirstResult():Future[Int]=Future.firstCompletedOf(listOfFutures)
```

In the above case, the `oneFuture` returns the fastest.

#####sequence

The `sequence` is pure magic. Say, you have a `List[Future[Int]]` just like `List(oneFuture,twoFuture,threeFuture)` and you require that all the values are given back to you as a `List[Int]` instead of each `Int` wrapped inside a `Future`. The `sequence` takes your `List[Future[Int]]` and transforms into a `Future[List[Int]]`

```language-scala
def getResultsAsList():Future[List[Int]]=Future.sequence(listOfFutures)
```

The last time I used was for batching where I executed logic against chunks of data in parallel and combined them together with `sequence`. 


----

###Scala-async library


The [Scala Async library](https://github.com/scala/async) is an external project and could be added to the project by adding the dependency into our *build.sbt*

```
"org.scala-lang.modules" %% "scala-async" % "0.9.6-RC2"
```

The **Async** library has just two powerful functions in its `scala.async.Async` class - `async` and `await`. 

#####async

The `async` function is very similar to the `Future.apply` function.  In fact their signatures are very much the same and we could comfortably replace the `Future.apply` with `async` wherever it is available.

**Future.apply**

```language-scala

def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T]
```

**Async**

```language-scala

def async[T](body: => T)(implicit execContext: ExecutionContext): Future[T]
```



The foremost advantage of using `async` over `Future.apply`, other than the general ease of readability,  is that for each Future generator (when used with a for comprehension), the compiler yields a separate anonymous class while with async it is just one single anonymous class.

Therefore, we could re-write our `oneFuture` as, 

```language-scala
val oneFuture: Future[Int] = async {
    Thread.sleep(1000)
    1
}
```

#####await

The `await` function accepts a `Future` and returns the result.  But isn't the same as `Await.result` which accepts a `Future` and returns the result as well?  Nope. The key difference is that the `Await.result` is blocking and is strongly discouraged to be used in production code except for testcases. The `await` function, on the other hand is implemented using Scala macros and the implementation is that it returns the result of the Future using the `onComplete` callback. 

Since the `async` function returns a `Future`, all other error handling and recovery mechanisms stays the same as before.  


######Code 

Let's rewrite the previous sum-of-three-numbers with async/await :

```language-scala
  def sumOfThreeNumbersParallelWithAsyncAwait(): Future[Int] = async {
    await(oneFuture) + await(twoFuture) + await(threeFuture)
  }
```

######Testcase

```language-scala
  it("could be composed using async/await") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallelWithAsyncAwait(), 4 seconds))
      result shouldBe 6
    }
```

As we see, the code written this way is not only asynchronous but also looks natural (in fact, it looks synchronous).  We could argue that for-comprehensions are a huge leap from using `map` and `flatMap` but `async/await` goes one big step further.

###Code

The [code](https://github.com/arunma/ScalaNotes/blob/master/src/main/scala/FutureCombinators.scala) and its corresponding [testcase](https://github.com/arunma/ScalaNotes/blob/master/src/test/scala/FutureCombinatorsTest.scala) are on github.
comments powered by Disqus