# A Pitfall of Global Execution Context

The convenience of Scala’s Future concept makes it the default tool for all kinds of programming tasks that call for asynchrony. There seems nothing simpler than wrapping a block of code in future { }, then, when compilation fails, importing scala.concurrent.ExecutionContext.Implicits.global, and we’re done, the code runs asynchronously. We can then use the computed result (if any) in further futures, or if we want to break out of the future world, do Await.result. This is indeed easy, and does the job in many cases, but when it does not, might cause a bit of head scratching. Below is one example of such pitfall.

## The Conundrum

Suppose we wanted to test the behaviour of a data store that is expected to support multiple concurrent readers. Say, there is a suspicion that every now and then, when faced with concurrent reads and writes, some of the readers obtain inconsistent data. We could write the test as an application that performs writes and multiple reads in parallel, then run it for a long time to give us some confidence that the issue does not occur. Our first attempt could look like this:

import scala.annotation.tailrec

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.future

object Test {
val consumerCount = 10

def main(args: Array[String]): Unit = {
(1 to consumerCount).foreach { consumer.start }
producer.start()

}

private object consumer {
def start(id: Int): Unit = future { consume(id) }

@tailrec
private def consume(id: Int): Unit = {
consume(id)
}

private def readDataFromTheSourceAndVerifyTheConsistency(id: Int): Unit = {
println(s"consumer $id consuming...") Thread.sleep(1) println(s"consumer$id consumed")
}
}

private object producer {
def start(): Unit = future { produce() }

@tailrec
private def produce(): Unit = {
writeANewBatchOfData()
produce()
}

private def writeANewBatchOfData(): Unit = {
println(s"producer producing...")
println(s"producer produced")
}
}
}

The tail of the output of this test, on my system, looked as follows:

[...]
consumer 7 consuming...
consumer 4 consumed
consumer 4 consuming...
consumer 3 consumed
consumer 3 consuming...
consumer 6 consumed
consumer 6 consuming...
consumer 9 consumed
consumer 9 consuming...
consumer 8 consumed
consumer 8 consuming...
consumer 2 consumed
consumer 2 consuming...
consumer 10 consumed
consumer 10 consuming...
consumer 5 consumed
consumer 5 consuming...
consumer 7 consumed
consumer 7 consuming...
consumer 4 consumed
consumer 4 consuming...
consumer 9 consumed
consumer 9 consuming...
consumer 8 consumed
consumer 8 consuming...

The consumers seem to be exhibiting reasonable parallelism but due to the cost of consumption relative to the cost of production, they have nothing to consume most of the time – it takes many loops of each consumer until the producer manages to push a new batch of data. To ensure that the test indeed exercises parallel reads, we only want the consumers to proceed with the next iteration once there is something new to consume. Java’s CyclicBarrier seems to be a perfect fit for this synchronisation problem: both consumers and producers will reach the barrier and then continue with the next iteration – production or consumption – in lock-step.

import scala.annotation.tailrec

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.future

import java.util.concurrent.CyclicBarrier

object Test {
val consumerCount = 10

def main(args: Array[String]): Unit = {
val dataBarrier = new CyclicBarrier(consumerCount + 1)

(1 to consumerCount).foreach { consumer.start(dataBarrier) }
producer.start(dataBarrier)

}

private object consumer {
def start(consumeBarrier: CyclicBarrier)(id: Int): Unit = future { consume(consumeBarrier, id) }

@tailrec
private def consume(consumeBarrier: CyclicBarrier, id: Int): Unit = {
consumeBarrier.await()
consume(consumeBarrier, id)
}

private def readDataFromTheSourceAndVerifyTheConsistency(id: Int): Unit = {
println(s"consumer $id consuming...") Thread.sleep(1) println(s"consumer$id consumed")
}
}

private object producer {
def start(produceBarrier: CyclicBarrier): Unit = future { produce(produceBarrier) }

@tailrec
private def produce(produceBarrier: CyclicBarrier): Unit = {
writeANewBatchOfData()
produceBarrier.await()
produce(produceBarrier)
}

private def writeANewBatchOfData(): Unit = {
println(s"producer producing...")
println(s"producer produced")
}
}
}

Now, when we run this, we can see that the consumers only execute once producer has produced a new batch, and while it is producing the next one:

[...]
producer produced
producer producing...
consumer 10 consuming...
consumer 8 consuming...
consumer 5 consuming...
consumer 3 consuming...
consumer 9 consuming...
consumer 7 consuming...
consumer 1 consuming...
consumer 6 consuming...
consumer 4 consuming...
consumer 2 consuming...
consumer 10 consumed
consumer 8 consumed
consumer 5 consumed
consumer 3 consumed
consumer 9 consumed
consumer 7 consumed
consumer 1 consumed
consumer 6 consumed
consumer 4 consumed
consumer 2 consumed
producer produced
producer producing...
consumer 10 consuming...
consumer 8 consuming...
consumer 5 consuming...
consumer 3 consuming...
consumer 9 consuming...
consumer 7 consuming...
consumer 1 consuming...
consumer 2 consuming...
consumer 4 consuming...
consumer 6 consuming...
consumer 10 consumed
consumer 8 consumed
consumer 5 consumed
consumer 3 consumed
consumer 9 consumed
consumer 7 consumed
consumer 1 consumed
consumer 2 consumed
consumer 4 consumed
consumer 6 consumed

This should give us a good chance of spotting an anomaly due to concurrent production and consumption. Now, we push this to the continuous integration environment and see the test freezing; not even a single iteration of any consumer is executed.

What has happened?

## The Cause

Among many differences between the local and continuous ingegration environment (e.g. Windows vs. Linux), a significant one in this case is the number of cores. While our development workstation had six hyper-threaded cores, the CI box has only four. The documentation of scala.concurrent.ExecutionContext.Implicits.global notes:

The default ExecutionContext implementation is backed by a work-stealing thread pool. By default, the thread pool uses a target number of worker threads equal to the number of available processors.

We are asking the test to spawn ten consumers and one producer, i.e. 11 tasks. While they can be allocated among the 12 virtual cores of the workstation, the CI only has 8 slots, meaning that 8 of the tasks start executing and wait on the barrier, while the remaining 3 have no chance to be scheduled and allow everyone to pass the barrier.

Does it mean we cannot get higher degree of parallelism for our producer and consumers? No, it does not; these tasks spent the bulk of their time performing I/O, so they are not CPU-bound and their number should not be restricted by the number of available processors.

## The Fix

Once we know the above, a fix is pretty obvious. Instead of:

import scala.concurrent.ExecutionContext.Implicits.global

define:

implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(
new java.util.concurrent.ForkJoinPool(consumerCount + 1))

in a scope that includes all invocations of future.

## Conclusion

I would not use the global execution context whenever the asynchronous computations require any kind of synchronisation, as it can easily end in a deadlock, as in the example above. Not only that – spawning anything long-running in this fashion might cause exhaustion of the global fork-join pool and would require reasoning about all asynchronous computations in the program globally. Perhaps more generally, future should not be treated as a drop-in construct that just makes things go faster and not block. Whenever tempted to do this, always consider how you expect the executor of this future to behave, and, if in doubt, define a local one.