Scala Futures on a Single Thread

Futures, introduced into Scala standard library in version 2.10, quickly became one of the cornerstones of Scala concurrency. Their main selling point is the ease of composition. A standard example of that involves asynchronous calls to a number of remote services and computation of the answer once all of the responses are collected:

def price(itemId: Id): Future[Price] = ???

def availability(itemId: Id): Future[Int] = ???

def orderQuote(itemId: Id, quantityRequired: Int): Future[Option[Price]] = for {
  unitPrice         <- price(itemId)
  quantityAvailable <- availability(itemId)
} yield if (quantityAvailable >= quantityRequired) Some(quantityRequired * unitPrice) else None

Here price and availability are service calls that can potentially take long time and are meant to be executed in parallel.

However, the usefulness of futures is not restricted to the concurrent setting. They also provide a nice pattern for suspending execution of a sequential algorithm until a certain condition is met. There is nothing inherently concurrent about Future implementation either – the runtime behaviour depends on the instance of ExecutionContext provided to functions that can spawn new computations.

The example below illustrates how futures and promises facilitate suspension of effects until the required condition – the availability of price in this case – is met.

import java.util.concurrent.Executor

import scala.collection.mutable
import scala.concurrent._
import scala.util.{Success, Failure}

// thanks to being implicit this execution context will be used by all Future methods that 
// can spawn new Futures, making all spawned computations synchronous
implicit val synchronousExecutionContext = ExecutionContext.fromExecutor(new Executor {
  def execute(task: Runnable) = task.run()
})
 
object PriceService {
  val prices = mutable.HashMap[String, Int]()
  val pendingRequests = mutable.HashMap[String, Set[Promise[Int]]]() withDefaultValue Set()
 
  def setPrice(symbol: String, price: Int): Unit = {
    prices += (symbol -> price)
    println(s"${Thread.currentThread.getName}: price received: $symbol: $price")
    pendingRequests(symbol).foreach(_.success(price))
  }
 
  def price(symbol: String): Future[Int] = 
    if (prices.contains(symbol)) Future.successful(prices(symbol))
    else {
      val request = Promise[Int]()
      val updatedRequests = if (pendingRequests.contains(symbol)) pendingRequests(symbol) + request
                            else                                  Set(request)
      pendingRequests += (symbol -> updatedRequests)
      request.future
    }
}
 
def publishValuation(symbol: String, quantity: Int): Unit = 
  PriceService.price(symbol).map(_ * quantity).onComplete {
    case Success(value) => println(s"${Thread.currentThread.getName}: $quantity x $symbol: $value)")
    case Failure(e)     => println(s"error: $e")
  }
 
PriceService.setPrice("AAA", 1013)
publishValuation("AAA", 20)
publishValuation("BBB", 30)
PriceService.setPrice("BBB", 221)

This code prints out:

main: price received: AAA: 1013
main: 20 x AAA: 20260)
main: price received: BBB: 221
main: 30 x BBB: 6630)

demonstrating that unfulfilled condition doesn’t block the program even though only a single thread is used – and all that happens without breaking the abstraction boundaries between PriceService and the valuation publishing code.