Clicky

Scala notes - Futures - 2 (Promises)

In the last post, we saw how to extract values from the Future upon onComplete and their counterparts - onSuccess and onFailure. We also saw how to use Await.result in Testcases to block and get the value from Future. In this post, we'll discuss briefly about the relationship between a Promise and a Future.

Promise

The concepts Promise and a Future go hand in hand. A scala.concurrent.Promise is the one which sets a value for the Future. In other words, the Promise is the brain behind executing the computation asynchronously and Future is just a handle for reading the result when it becomes available. Crudely put, the Promise is the setter and the Future is the getter.

Most often, we won't need to explicitly create a Promise. However, we are required to understand what a Promise is, in order to truly understand how a Future works.

Let's use the following examples to understand how to create a Promise.

1. Completing a Promise

In the following piece of code, we will see how a value is set in a promise and how it is read on the other side.

We
1. create a Promise
2. complete the Promise by setting a successful value
3. then return the read side of the Promise - the Future back to the caller by using the promise.future

There is no time consuming process happening behind the scenes. The value is set to the Promise immediately and therefore the value is immediately available via the Future.

Code
class PromiseInternals {  
...
 def aCompletedPromiseUsingSuccess(num:Int): Future[Int] = {
    val promise=Promise[Int]()
    promise.success(num)
    promise.future
  }
...
...
Testcase

When we run the Testcase code, the onComplete callback gets called immediately after the promise.success(100) is called.

class PromiseInternalsTest extends FunSpec with Matchers {  
  describe("A Future") {
    it("gives out the correct value when a Promise is completed") {
      val promiseInternals = new PromiseInternals
      val aCompletedPromise=promiseInternals.aCompletedPromiseUsingSuccess(100)
      assertValue(aCompletedPromise, 100)
    }
...
...

  def assertValueUsingOnComplete(future: Future[Int], expectedValue: Int): Unit = {
    future.onComplete {
      case Success(result) => {
        println (s"Result is $result and expectedValue is $expectedValue")
        result shouldBe expectedValue
      }
      case Failure (msg) => fail(msg)
    }
  }

The promise.success is just a shortcut for using promise.complete which accepts a Try[T] as an argument. So, we could have actually written the above function as :

  def aCompletedPromiseUsingComplete(num:Int): Future[Int] = {
    val promise=Promise[Int]()
    promise.complete(Success(num))
    promise.future
  }

Alternatively, if we would like to indicate a failure in computation, we could either use a promise.complete(Failure(throwable)) or

  def aCompletedPromiseUsingFailure(num:Int): Future[Int] = {
    val promise=Promise[Int]()
    promise.failure(new RuntimeException("Evil Exception"))
    promise.future
  }

Let's summarize the above in a picture :

Future and Promise

2. Running a block asynchronously

Now that we saw how to complete a Promise by setting a successful value or an exception, we'll see how to execute a block of code asynchronously.

In the following Testcase, we pass a block of code to the someExternalDelayedCalculation to be executed asynchronously.

Testcase

Let's look at the testcase first.

  1. We pass a block as argument. The block of code simply sleeps for 2 seconds and then returns a 100.
  2. Assert the value after 3 seconds.

Simple enough.

  it("gives out the correct value when an asynchronous block is submitted and is completed through a Promise") {
      val promiseInternals = new PromiseInternals
      val longCalculationFuture = promiseInternals.someExternalDelayedCalculation{()=>
        Thread.sleep(2000)
        100
      }
      println (s"We have submitted a block to be executed asynchronously ${longCalculationFuture.isCompleted}") //false at this point
      assertValue(longCalculationFuture, 100)
    }

  def assertValue(future: Future[Int], expectedValue: Int): Unit = {
    val resultVal=Await.result(future, 3000 seconds)
    resultVal shouldBe expectedValue
  }
Code

The implementation of the someExternalDelayedCalculation is interesting :

We
1. create a FixedThreadPool to execute our asynchronous code.
2. create a Promise
3. create a Runnable and wrap the block to be run asynchronously in the run method
4. close the promise and complete the promise using the result of the run
5. execute the Runnable in the somePool threadpool.
6. return the promise.future from which the caller can read the value.

  val somePool=Executors.newFixedThreadPool(2)

  def someExternalDelayedCalculation(f:()=>Int): Future[Int] = {
    val promise=Promise[Int]()
    val thisIsWhereWeCallSomeExternalComputation = new Runnable {
      override def run(): Unit ={
        promise.complete{
          try(Success(f()))
          catch {
            case NonFatal (msg)=> Failure(msg)
          }
        }
      }
    }

    somePool.execute(thisIsWhereWeCallSomeExternalComputation)
    promise.future
  }

That's it !!

3. How is the Future.apply() actually implemented?

Well, I cheated. The code in bullet 2 is actually stolen from the actual implementation of the Future.apply itself.

Remember in the previous post, we saw that when a block of code is passed into the Future's apply function, it gets executed asynchronously.

Now, compare the code above in someExternalDelayedCalculation with the actual implementation of Future.apply and the Runnable that it wraps.

  def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
    val runnable = new PromiseCompletingRunnable(body)
    executor.prepare.execute(runnable)
    runnable.promise.future
  }

class PromiseCompletingRunnable[T](body: => T) extends Runnable {  
    val promise = new Promise.DefaultPromise[T]()

    override def run() = {
      promise complete {
        try Success(body) catch { case NonFatal(e) => Failure(e) }
      }
    }
  }

To repeat the same steps as above, the apply function

  1. holds the ThreadPool that we provide as the implicit ExecutionContext
  2. creates a Promise by creating a PromiseCompletingRunnable that is a Runnable
  3. wraps the block to be run asynchronously in the run method
  4. closes the promise and completes the promise using the result of the run
  5. executes the Runnable using the ExecutionContext
  6. returns the promise.future from which the caller can read the value.
4. Once written, twice error

Once the promise gets completed either with a Success or a Failure, all we could do after that is to extract the value from its Future. Also, the onComplete callback of the Future gets called. The value wrapped inside the Future of the Promise is set in stone and cannot be changed.

If we attempt to set a new value by completing an already completed Promise, an IllegalStateException is thrown.

Code

Let's look at this using a snippet. In the following code, we create a Promise and complete it with a value of 100. We then attempt to complete it with a failure.

  def alreadyCompletedPromise(): Future[Int] = {
    val promise = Promise[Int]()
    promise.success(100) //completed
    promise.failure(new RuntimeException("Will never be set because an IllegalStateException will be thrown beforehand"))
    promise.future
  }
Testcase

The testcase just asserts that the IllegalStateException gets thrown when an attempt to complete the Promise with a Failure.

  it("should throw an error if a Promise is attempted to be completed more than once") {
      val promiseInternals = new PromiseInternals
      intercept[IllegalStateException] {
        promiseInternals.alreadyCompletedPromise()
      }
    }

Code

The complete code backing this blog is available in github

comments powered by Disqus
Google