Java Multithreading for Senior Engineering Interviews (Notes from an Educative course)

LiveRunGrow
74 min readMay 11

--

Photo by Michiel Leunens on Unsplash

The Basics

The simplest example to think of a concurrent system is a single-processor machine running your favourite IDE. Say you edit one of your code files and click save, that clicking of the button will initiate a workflow which will cause bytes to be written out to the underlying physical disk. However, IO is an expensive operation, and the CPU will be idle while bytes are being written out to the disk.

Whilst IO takes place, the idle CPU could work on something useful and here is where threads come in — the IO thread is switched out and the UI thread gets scheduled on the CPU so that if you click elsewhere on the screen, your IDE is still responsive and does not appear hung or frozen.

Threads can give the illusion of multitasking even though at any given point in time the CPU is executing only one thread. Each thread gets a slice of time on the CPU and then gets switched out either because it initiates a task which requires waiting and not utilising the CPU or it completes its time slot on the CPU. -> Context switch, time-slicing.

Of course, if we have multiple cores then a multiple thread program can execute concurrently. Each thread by a different cpu core.

Benefits of threads

  1. Higher throughput, though in some pathetic scenarios it is possible to have the overhead of context switching among threads steal away any throughput gains and result in worse performance than a single-threaded scenario. However such cases are unlikely and an exception, rather than the norm.
  2. Responsive applications that give the illusion of multi-tasking.
  3. Efficient utilisation of resources. Note that thread creation is light-weight in comparison to spawning a brand new process. Web servers that use threads instead of creating new processes when fielding web requests, consume far fewer resources.

The following code can be run to compare the time it takes to finish the task when using 1 thread or 2. The one with 2 threads performs faster.

class Demonstration {
public static void main( String args[] ) throws InterruptedException {
SumUpExample.runTest();
}
}

class SumUpExample {

long startRange;
long endRange;
long counter = 0;
static long MAX_NUM = Integer.MAX_VALUE;

public SumUpExample(long startRange, long endRange) {
this.startRange = startRange;
this.endRange = endRange;
}

public void add() {

for (long i = startRange; i <= endRange; i++) {
counter += i;
}
}

static public void twoThreads() throws InterruptedException {

long start = System.currentTimeMillis();
SumUpExample s1 = new SumUpExample(1, MAX_NUM / 2);
SumUpExample s2 = new SumUpExample(1 + (MAX_NUM / 2), MAX_NUM);

Thread t1 = new Thread(() -> {
s1.add();
});

Thread t2 = new Thread(() -> {
s2.add();
});

t1.start();
t2.start();

t1.join();
t2.join();

long finalCount = s1.counter + s2.counter;
long end = System.currentTimeMillis();
System.out.println("Two threads final count = " + finalCount + " took " + (end - start));
}

static public void oneThread() {

long start = System.currentTimeMillis();
SumUpExample s = new SumUpExample(1, MAX_NUM );
s.add();
long end = System.currentTimeMillis();
System.out.println("Single thread final count = " + s.counter + " took " + (end - start));
}


public static void runTest() throws InterruptedException {

oneThread();
twoThreads();

}
}

Problems with threads

  1. Usually very hard to find bugs, some that may only rear head in production environments.-> Threads can execute concurrently and introduce non-deterministic behaviour. The running of these threads may produce different output when run at different time.
  2. Higher cost of code maintenance since the code inherently becomes harder to reason about.
  3. Increased utilisation of system resources. Creation of each thread consumes additional memory, CPU cycles for book-keeping and waste of time in context switches.
  4. Programs may experience slowdown as coordination amongst threads comes at a price. Acquiring and releasing locks adds to program execution time. Threads fighting over acquiring locks cause lock contention.

Program Vs Process Vs Thread

A program is a set of instructions and associated data that resides on the disk and is loaded by the operating system to perform some task. An executable file or a python script file are examples of programs. In order to run a program, the operating system’s kernel is first asked to create a new process, which is an environment in which a program executes.

A process is a program in execution. A process is an execution environment that consists of instructions, user-data, and system-data segments, as well as lots of other resources such as CPU, memory, address-space, disk and network I/O acquired at runtime. A program can have several copies of it running at the same time but a process necessarily belongs to only one program.

Thread is the smallest unit of execution in a process. A thread simply executes instructions serially. A process can have multiple threads running as part of it. Usually, there would be some state associated with the process that is shared among all the threads and in turn each thread would have some state private to itself. The globally shared state amongst the threads of a process is visible and accessible to all the threads, and special attention needs to be paid when any thread tries to read or write to this global shared state.

Why do bugs occur with multi-threading?

Concurrency Vs Parallelism

Serial execution: Finish processes one at a time.

A concurrent program is one that can be decomposed into constituent parts and each part can be executed out of order or in partial order without affecting the final outcome. The classic example of a concurrent system is that of an operating system running on a single core machine. Such an operating system is concurrent but not parallel. It can only process one task at any given point in time but all the tasks being managed by the operating system appear to make progress because the operating system is designed for concurrency. Each task gets a slice of the CPU time to execute and move forward.

A parallel system is one which necessarily has the ability to execute multiple programs at the same time. Usually, this capability is aided by hardware in the form of multicore processors on individual machines or as computing clusters where several machines are hooked up to solve independent pieces of a problem simultaneously.

Cooperative Multitasking vs Preemptive Multitasking

Preemptive: The operating system’s scheduler decides which thread or program gets to use the CPU next and for how much time. We don’t need to define it in our code.

Cooperative: Involves well-behaved programs to voluntarily give up control back to the scheduler so that another program can run. The operating system’s scheduler has no say in how long a program or thread runs for. A malicious program can bring the entire system to a halt by busy waiting or running an infinite loop and not giving up control.

Synchronous vs Asynchronous

Synchronous execution is synonymous to serial execution.

Asynchronous (or async) execution refers to execution that doesn’t block when invoking subroutines. An asynchronous program doesn’t wait for a task to complete and can move on to the next task.

Usually, such methods return an entity sometimes called a future or promise that is a representation of an in-progress computation. The program can query for the status of the computation via the returned future or promise and retrieve the result once completed. Another pattern is to pass a callback function to the asynchronous function call which is invoked with the results when the asynchronous function is done processing.

Asynchronous programming is an excellent choice for applications that do extensive network or disk I/O and spend most of their time waiting. As an example, Javascript enables concurrency using AJAX library’s asynchronous method calls. In non-threaded environments, asynchronous programming provides an alternative to threads in order to achieve concurrency and fall under the cooperative multitasking model.

I/O Bound vs CPU Bound

Programs which are compute-intensive i.e. program execution requires very high utilization of the CPU (close to 100%) are called CPU bound programs. Such programs primarily depend on improving CPU speed to decrease program completion time. This could include programs such as data crunching, image processing, matrix multiplication etc.

I/O bound programs are the opposite of CPU bound programs. Such programs spend most of their time waiting for input or output operations to complete while the CPU sits idle. I/O operations can consist of operations that write or read from main memory or network interfaces.

If a program is CPU bound we can increase the number of processors and structure our program to spawn multiple threads that individually run on a dedicated or shared CPU. For I/O bound programs, it makes sense to have a thread give up CPU control if it is waiting for an I/O operation to complete so that another thread can get scheduled on the CPU and utilize CPU cycles. Different programming languages come with varying support for multithreading. For instance, Javascript is single-threaded, Java provides full-blown multithreading and Python is sort of multithreaded as it can only have a single thread in running state because of its global interpreter lock (GIL) limitation. However, all three languages support asynchronous programming models which is another way for programs to be concurrent (but not parallel).

For completeness we should mention that there are also memory-bound programs that depend on the amount of memory available to speed up execution.

Throughput VS Latency

Throughput is defined as the rate of doing work or how much work gets done per unit of time. If you are an Instagram user, you could define throughput as the number of images your phone or browser downloads per unit of time.

Latency is defined as the time required to complete a task or produce a result. Latency is also referred to as response time. The time it takes for a web browser to download Instagram images from the internet is the latency for downloading the images.

Critical Sections & Race Conditions

Critical section is any piece of code that has the possibility of being executed concurrently by more than one thread of the application and exposes any shared data or resources used by the application for access.

In a race condition, threads access shared resources or program variables that might be worked on by other threads at the same time causing the application data to be inconsistent.

Life cycle of a Thread

  • new: when a thread is created but not started
  • runnable: when a thread is ready for execution and waiting for execution
  • running: when a thread is running
  • blocked: when a thread is waiting for a monitor lock to be released….
  • terminated: thread completes and or is stopped

Thread creation

  • Implementing runnable interface
  • Extending Thread class

What’s the difference between sleep() and wait() methods in java? When will you use each one?

Sleep() method is used to pause execution of a thread for a specified period of time. -> Can be the main thread, which is the default thread when you run a Java program. Thread.sleep(100)

After the time has been elapsed, it can resume operation. However, note that it is not guaranteed when it can start because it depends on the scheduler’s decision. Other threads may be given priority. The duration may also vary between different platforms and JVM implementations due to precision of timers, and other platform specific factors.

On the other hand, the wait() method is used for inter-thread communication and allows a thread to release the lock and wait until it is notified by another thread to resume operation.

Explain the concept of thread safety and how it can be achieved in Java

Thread safety refers to the ability of a piece of code or data structure to be safely accessed by multiple threads without causing data corruption or unexpected behavior.

In Java, it can achieved by using synchronisation, using concurrent data structures from the java.util.concurrent package, or by ensuring that shared mutable objects are accessed in a thread safe manner.

public class Sum {

int sum(int... vals) {

int total = 0;
for (int i = 0; i < vals.length; i++) {
total += vals[i];
}
return total;
}
}

The class Sum is stateless i.e. it doesn't have any member variables. All stateless objects and their corresponding classes are thread-safe. Since the actions of a thread accessing a stateless object can't affect the correctness of operations in other threads, stateless objects are thread-safe.

However, note that the method takes in variable arguments and the class wouldn’t be thread safe anymore if the passed in argument was an array instead of individual integer variables and at the same the time, the sum method performed a write operation on the passed in array.

Race Condition:

A race condition occurs when the correctness of a computation depends on the relative timing or interleaving of multiple threads by the runtime; in other words, getting the right answer relies on lucky timing.

1.  class HitCounter {
2.
3. long count = 0;
4.
5. void hit() {
6. count++;
7. }
8.
9. long getHits() {
10. return this.count;
11. }
}
  1. Say count = 7
  2. Thread A is about to execute line #6, which consists of fetching the count variable, incrementing it and then writing it back.
  3. Thread A reads the count value equal to 7
  4. Thread A gets context switched from the processor
  5. Thread B executes line#6 atomically and increments count = 8
  6. Thread A gets scheduled again
  7. Thread A had previously read the count = 7 and increment it to 8 and writes it back.
  8. The net effect is count ends up with a value 8 when it should have been

This is an example of read-modify-write type of race condition.

Deadlocks, Liveness & Reentrant Locks

Deadlocks occur when two or more threads aren’t able to make any progress because the resource required by the first thread is held by the second and the resource required by the second thread is held by the first.

void increment(){

acquire MUTEX_A
acquire MUTEX_B
// do work here
release MUTEX_B
release MUTEX_A

}


void decrement(){

acquire MUTEX_B
acquire MUTEX_A
// do work here
release MUTEX_A
release MUTEX_B

}

// deadlock example
T1 enters function increment

T1 acquires MUTEX_A

T1 gets context switched by the operating system

T2 enters function decrement

T2 acquires MUTEX_B

both threads are blocked now

Ability of a program or an application to execute in a timely manner is called liveness. If a program experiences a deadlock then it’s not exhibiting liveness.

A live-lock occurs when two threads continuously react in response to the actions by the other thread without making any real progress. The best analogy is to think of two persons trying to cross each other in a hallway. They keep moving left and right at the same time, never cross each other and keep blocking each other.

Other than a deadlock, an application thread can also experience starvation, when it never gets CPU time or access to shared resources. Other greedy threads continuously hog shared system resources not letting the starving thread make any progress.

A reentrant lock, also known as a recursive lock, is a synchronisation mechanism used in concurrent programming to control access to a shared resource by multiple threads. It allows a thread that already holds the lock to acquire it again, without blocking itself. This feature is useful in situations where a thread needs to enter a section of code protected by the lock multiple times, without being blocked by its own previous acquisitions of the lock. Reentrant locks maintain a count of the number of times a thread has acquired the lock. When a thread acquires the lock, the count is incremented. The lock is only fully released when the thread releases it the same number of times it acquired it. This ensures that other threads waiting for the lock are not blocked unnecessarily.

Mutex Vs Semaphores

Mutex as the name hints implies mutual exclusion. A mutex is used to guard shared data such as a linked-list, an array or any primitive type. A mutex allows only a single thread to access a resource or critical section.

Once a thread acquires a mutex, all other threads attempting to acquire the same mutex are blocked until the first thread releases the mutex. Once released, most implementations arbitrarily chose one of the waiting threads to acquire the mutex and make progress.

Semaphore, on the other hand, is used for limiting access to a collection of resources. Think of semaphore as having a limited number of permits to give out. If a semaphore has given out all the permits it has, then any new thread that comes along requesting for a permit will be blocked, till an earlier thread with a permit returns it to the semaphore. A typical example would be a pool of database connections that can be handed out to requesting threads. Say there are ten available connections but 50 requesting threads. In such a scenario, a semaphore can only give out ten permits or connections at any given point in time.

A semaphore with a single permit is called a binary semaphore and is often thought of as an equivalent of a mutex, which isn’t completely correct as we’ll shortly explain.

Differences:

(1) Ownership

A semaphore can potentially act as a mutex if the permits it can give out is set to 1. However, the most important difference between the two is that in case of a mutex the same thread must call acquire and subsequent release on the mutex whereas in case of a binary semaphore, different threads can call acquire and release on the semaphore.

This leads us to the concept of ownership. A mutex is owned by the thread acquiring it till the point the owning-thread releases it, whereas for a semaphore there’s no notion of ownership.

(2) Signalling

Another distinction between a semaphore and a mutex is that semaphores can be used for signalling amongst threads, for example in case of the classical producer/consumer problem the producer thread can signal the consumer thread by incrementing the semaphore count to indicate to the consumer thread to consume the freshly produced item. A mutex in contrast only guards access to shared data among competing threads by forcing threads to serialize their access to critical sections and shared data-structures.

private static final int PERMITS = 1; 
private static Semaphore semaphore = new Semaphore(PERMITS); // main class variable

// put this in the thread class
semaphore.acquire();
semaphore.release();

Monitor (This section is an explanation from the youtube video, which i felt is much clearer than the one by Educative.io)

Semaphore is a huge step up from the equivalent load/store implementation.

However, what was not so nice about Semaphores?

  • They are essentially global variables that multiple people are modifying.
  • No linguistic connection between the semaphore and data.
  • No control or guarantee of proper usage. Depends on devs to write wait(), signal() etc.

Therefore, we consider a Monitor which is a higher level construct.

A monitor is a class that ties the data, operation, and in particular the synchronisation operations all together.

Unlike classes, monitor guarantee mutual exclusion (only 1 thread may execute the monitor’s method at a time) and require all data to be private.

A formal definition is that, it define a lock and zero or more condition variables for managing concurrent access to shared data. The monitor uses the lock to ensure that only 1 single thread is active at the monitor. The condition variables enable threads to go to sleep inside of critical sections by releasing the lock at the same time, it puts the thread to wait.

To turn a Java class into a Monitor
1. make all the data private
2. make all methods synchronized (the non private ones)

class Queue{
private.. .//data

public void synchronized add(Object item){
// put item in queue
}

public Onbject synchronized remove(){
if queue is not empty{
remove item
return item
}
}

}

In Java, we don’t need to manage the lock. Monitor is built into the language.

If the queue is empty, and you call remove(), you want the thread to go to sleep and be blocked until the queue has items.

Why can’t we do that? How can we change remove() to wait until something is on the queue.

When you are inside the function, executing the remove() function which is a critical section, you are holding the lock. If you call the thread to go to sleep, you are still holding the lock. This prevents other threads to execute, to access the shared queue to add an item to it and wake up the sleeping thread.

Hence, we introduce condition variables. Condition variables enables a thread to sleep inside a critical section. Any lock held by the thread is automatically released when the thread is put to sleep.

Condition variable: A queue of threads waiting for something inside a critical section.

It supports 3 operations

  1. wait(Lock lock): Atomic (release lock, go to sleep), when the process wakes up, it re-acquire lock
  2. signal(): Wake up waiting thread, if one exists
  3. broadcast(): Wake up all waiting threads

Rule, thread must hold lock when doing condition variable.

In Java, it is slightly different. It is called

wait() to give up lock
notify() to signal that the condition a thread is waiting on is satisfied
notifyAll() to wake up all waiting threads
Effectively, one condition variable per object.

class Queue{
private.. .//data

public void synchronized add(Object item){
// put item in queue
notify();
}

public Onbject synchronized remove(){
while queue is empty{
wait(); // give up lock and go to sleep
}
remove item
return item
}

}

wait() and notify() are methods of any class in java. Every class in Java inherits from the Object class. Hence, we can just call it in any class and you are using the class itself as the condition variable. If you want to add another condition variable, you can add another object and call wait and notify on it.

Why did we not use an if loop? Why use while loop?

There are 2 types of monitors: Mesa style (Java uses this), Hoare-style (Textbooks)

Mesa-style

  • The thread that signals keeps the lock (and thus the processor)
  • The waiting thread waits for the lock

Hoare-style

  • The thread that signals gives up the lock and the waiting thread gets the lock
  • When the thread that was waiting and is now executing exits or waits again, it releases the lock back to the signaling thread.

***Hence, in Mesa style, when the thread wakes up, it is not given the lock. Other thread might get the lock, remove item from the queue. Then, when the thread wakes up, it needs to re-acquire the lock. By the time it acquires the lock, it might see that the queue is still empty. Hence, it might need to wait() again after it is awakened. Hence, we use a while loop to check for this condition.***

Unless we use Hoare-style, we will need to use while loop.

Semaphores vs Monitor

semaphores has memory, condition variable does not have memory.

If you call signal on a condition variable and nothing is waiting, nothing happens. If you call signal on a semaphore, regardless if anything is blocked or waiting, you will still increment the semaphore variable.

The ordering of calls in semaphores does not matter. If initial value of semaphore is 0, and we have 2 thread, 1 calling wait and 1 calling signal. The thread that calls wait() may not be blocked. If signal() thread happens before wait(), after it finishes, the semaphore becomes of 1 value. Then, wait() will be able to immediately execute. Basically, you will always have the same num of threads being blocked regardless of the execution order.

Whereas, for condition variable, a wait() will always block until someone does a signal(). If we execute a thread that does signal() before wait(), then the wait() will be blocked until a second signal() comes. But if wait() is executed first before signal(), then it will not be blocked for a long time. Here, ordering matter.

Condition variables only work inside a lock. If we use semaphores inside a lock, we may get deadlock.

If we want to implement Monitors with Semaphores, we can do so. But we need to make sure of the two things below

  1. A thread that calls wait() will always have to be blocked.
  2. Monitor does not store state

Note: The above section may have some overlaps with the subsequent sections

Mutex Vs Monitor

Concisely, a monitor wraps operation with a mutex. Monitors are generally language level constructs whereas mutex and semaphore are lower-level or OS provided constructs.

Usually, a consumer will need to repeatedly check on a predicate to see if it is true (means that a lock is released).

void busyWaitFunction() {
// acquire mutex
while (predicate is false) {
// release mutex

/* Make sure that the predicate it is checking is
*the updated one and is not modified by other threads not holding
*/ the mutex

// acquire mutex
/*
* Acquire mutex again so that it can check the predicate again.
*/
}
// do something useful
// release mutex
}

Spin waiting is a technique used in concurrent programming where a thread continuously checks a condition in a loop, known as a spin loop, until the condition becomes true. It involves repeatedly executing a small piece of code that checks the condition, without yielding the CPU to other threads.

In the provided code snippet, the spin waiting is implemented using a while loop. The thread acquires a mutex (a mutual exclusion lock) and then enters the while loop. Inside the loop, it releases the mutex, allowing other threads to acquire it, and then immediately tries to acquire the mutex again. This process continues until the predicate (the condition being checked) becomes true. Once the predicate is true, the thread exits the loop, performs some useful work, and then releases the mutex.

While spin waiting can achieve its intended purpose of synchronising threads, it is generally considered inefficient because it consumes CPU cycles even when the condition being waited upon is not yet true.

Spin waiting -> Condition variables

To address the inefficiency of spin waiting, condition variables are commonly used. Instead of continuously checking the condition in a loop, a thread can suspend their execution, wait on a condition variable and be notified when the condition changes to true. This allows the thread to sleep and relinquish the CPU, reducing resource consumption.

Explanation:

In the context of condition variables, there are two main operations:

  1. wait(): When a thread calls the wait() method on a condition variable, it releases the associated mutex atomically and enters a wait queue. The thread waits in the queue until it is signaled or notified by another thread. Releasing the mutex allows other threads to acquire it and potentially change the state of the condition being waited upon.
  2. signal(): The signal() method, when called on a condition variable, wakes up one of the threads waiting in its associated wait queue. The awakened thread is then placed in a ready queue, but it does not start executing immediately. The thread must wait to reacquire the mutex associated with the condition variable before it can proceed.

Condition variable and mutex are 2 different things.

void efficientWaitingFunction() {
mutex.acquire()
while (predicate == false) {
condVar.wait()
}
// Do something useful
mutex.release()
}

void changePredicate() {
mutex.acquire()
set predicate = true
condVar.signal()
mutex.release()
}

// similar code
// Note that the code here does not use synchronized keyword. It implements
// it's own lock.
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MutexConditionExample {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private boolean conditionMet = false;

public void doWork() throws InterruptedException { // efficientWaitingFunction
lock.lock();
try {
while (!conditionMet) {
condition.await(); // Wait until condition is met
}
// Perform work here
System.out.println("Work is being done...");
} finally {
lock.unlock();
}
}

public void signalCondition() {
lock.lock();
try {
conditionMet = true;
condition.signal(); // Signal that condition is met
} finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
MutexConditionExample example = new MutexConditionExample();

// Create and start a worker thread
Thread workerThread = new Thread(() -> {
try {
example.doWork();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
workerThread.start();

// Sleep for a while to simulate some processing
Thread.sleep(2000);

// Signal the condition to wake up the worker thread
example.signalCondition();

// Wait for the worker thread to complete
workerThread.join();
}
}

In the provided code snippet, the efficientWaitingFunction() demonstrates the usage of condition variables. The thread acquires a mutex, enters a while loop, and checks the predicate. If the predicate is false, indicating the desired condition is not met, the thread calls condVar.wait(), which releases the mutex and places the thread in a wait queue. This allows other threads to acquire the mutex and potentially change the predicate. The thread remains in the wait queue until it is signalled.

  • By acquiring the mutex before checking the predicate, the thread ensures that it has exclusive access to the shared data. This guarantees that the value of the predicate it checks is the most up-to-date and accurate representation of the shared state. If the mutex was not acquired, another thread could modify the predicate between the check and the subsequent wait() call, leading to incorrect behaviour.

The changePredicate() function represents another thread that can modify the state of the predicate. It acquires the mutex, changes the predicate to true, and then calls condVar.signal() to wake up one of the threads waiting on the condition variable. Finally, it releases the mutex.

  • Signalling the condition variable before releasing the mutex ensures that the waiting thread is awakened and ready to proceed as soon as possible. If the signal() call happened after releasing the mutex, the mutex could be captured by some other thread and the waiting thread might not be immediately ready for execution, and there could be a delay until it gets a chance to run.

The order of signalling the condition variable and releasing the mutex can be important. In general, it is preferred to signal first and then release the mutex.

Monitor

The code above demonstrates the usage of a mutex (in the form of a ReentrantLock) and a condition variable (in the form of a Condition) to achieve synchronization and signaling between threads. It is a monitor-like approach.

In summary, a monitor is a higher-level construct that combines both data and synchronization, while a mutex and condition variable are lower-level primitives used for mutual exclusion and thread communication, respectively.

all the conditions can only be accessed by the procedure in the monitor

A single monitor can have multiple condition variables but not vice versa. Theoretically, another way to think about a monitor is to consider it as an entity having two queues or sets where threads can be placed. One is the entry set and the other is the wait set. When a thread A enters a monitor it is placed into the entry set. If no other thread owns the monitor, which is equivalent of saying no thread is actively executing within the monitor section, then thread A will acquire the monitor and is said to own it too. Thread A will continue to execute within the monitor section till it exits the monitor or calls wait() on an associated condition variable and be placed into the wait set. While thread A owns the monitor no other thread will be able to execute any of the critical sections protected by the monitor. New threads requesting ownership of the monitor get placed into the entry set.

Continuing with our hypothetical example, say another thread B comes along and gets placed in the entry set, while thread A sits in the wait set. Since no other thread owns the monitor, thread B successfully acquires the monitor and continues execution. If thread B exits the monitor section without calling notify() on the condition variable, then thread A will remain waiting in the wait set. Thread B can also invoke wait() and be placed in the wait set along with thread A. This then would require a third thread to come along and call notify() on the condition variable on which both threads A and B are waiting. Note that only a single thread will be able to own the monitor at any given point in time and will have exclusive access to data structures or critical sections protected by the monitor.

Practically, in Java each object is a monitor and implicitly has a lock and is a condition variable too. You can think of a monitor as a mutex with a wait set. Monitors allow threads to exercise mutual exclusion as well as cooperation by allowing them to wait and signal on conditions.

See this for another explanation: https://www.baeldung.com/cs/monitor

wait() orders the calling thread to release the monitor and go to sleep until some other thread enters this monitor and calls notify()

Java’s Monitor & Hoare vs Mesa Monitors

In Java, the monitor concept is built into the language through the synchronized keyword, which provides a way to create synchronized blocks and methods.

public class MonitorExample {
private int count = 0;

public synchronized void increment() {
count++;
}

public synchronized void decrement() {
count--;
}

public synchronized int getCount() {
return count;
}

public static void main(String[] args) {
MonitorExample example = new MonitorExample();

Thread incrementThread = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
example.increment();
}
});

Thread decrementThread = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
example.decrement();
}
});

incrementThread.start();
decrementThread.start();

try {
incrementThread.join();
decrementThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("Count: " + example.getCount());
}
}

By using the synchronized keyword, the methods in the MonitorExample class create a monitor around the shared count variable, ensuring that only one thread can access and modify it at a time, thus providing thread safety.

When a thread executes a synchronized block or enters a synchronized method, it automatically acquires the lock associated with the object or class. This ensures that only one thread can execute the synchronized code at a time, providing mutual exclusion and synchronization.

We do not use explicit wait(), notify() or notifyAll() within the synchronized blocks as we do not require explicit coordination between threads, such as waiting for a specific condition to be met.

Java provides a built-in mechanism to provide atomicity called the synchronized block. A synchronized method is a shorthand for a synchronized block that spans an entire method body and whose lock is the object on which the method is being invoked.

///////////////////////////////////////////////

In Java every object is a condition variable and has an associated lock that is hidden from the developer. Each java object exposes wait() and notify() methods.

Before we execute wait() on a java object we need to lock its hidden mutex.

That is done implicitly through the synchronized keyword. If you attempt to call wait() or notify() outside of a synchronized block, an IllegalMonitorStateException would occur.

The ownership of the monitor can be achieved in the following ways:

  • the method the thread is executing has synchronized in its signature
  • the thread is executing a block that is synchronized on the object on which wait or notify will be called
  • in case of a class, the thread is executing a static method which is synchronized.

Below are bad synchronisation examples

class BadSynchronization {

public static void main(String args[]) throws InterruptedException {
Object dummyObject = new Object();

// Attempting to call wait() on the object
// outside of a synchronized block.
dummyObject.wait();
}
}

class BadSynchronization {

public static void main(String args[]) {
Object dummyObject = new Object();
Object lock = new Object();

synchronized (lock) {
lock.notify();

// Attempting to call notify() on the object
// in synchronized block of another object
dummyObject.notify();
}
}
}

So far we have determined that the idiomatic usage of a monitor requires using a while loop as follows. Let’s see how the design of monitors affects this recommendation.

while( condition == false ) {
condVar.wait();
}

Once the asleep thread is signalled and wakes up, you may ask why does it need to check for the condition being false again, the signalling thread must have just set the condition to true?

In Mesa monitors — Mesa being a language developed by Xerox researchers in the 1970s — it is possible that the time gap between thread B calls notify() and releases its mutex and the instant at which the asleep thread A, wakes up and reacquires the mutex, the predicate is changed back to false by another thread different than the signaler and the awoken threads! The woken up thread competes with other threads to acquire the mutex once the signaling thread B empties the monitor. On signalling, thread B doesn't give up the monitor just yet; rather it continues to own the monitor until it exits the monitor section.

In contrast, Hoare monitors — Hoare being one of the original inventor of monitors — the signalling thread B yields the monitor to the woken up thread A and thread A enters the monitor, while thread B sits out. This guarantees that the predicate will not have changed and instead of checking for the predicate in a while loop an if-clause would suffice. The woken-up/released thread A immediately starts execution when the signalling thread B signals that the predicate has changed. No other thread gets a chance to change the predicate since no other thread gets to enter the monitor.

Java, in particular, subscribes to Mesa monitor semantics and the developer is always expected to check for condition/predicate in a while loop. Mesa monitors are more efficient than Hoare monitors.

Further reason on why we might need a while loop

* A thread can also wake up without being notified, interrupted, or
* timing out, a so-called <i>spurious wakeup</i>. While this will rarely
* occur in practice, applications must guard against it by testing for
* the condition that should have caused the thread to be awakened and
* continuing to wait if the condition is not satisfied. In other words,
* waits should always occur in loops, like this one:

Semaphore vs Monitor

Monitor, mutex, and semaphores can be confusing concepts initially. A monitor is made up of a mutex and a condition variable. One can think of a mutex as a subset of a monitor. Differences between a monitor and semaphore are discussed below.

  • A monitor and a semaphore are interchangeable and theoretically, one can be constructed out of the other or one can be reduced to the other. However, monitors take care of atomically acquiring the necessary locks whereas, with semaphores, the onus of appropriately acquiring and releasing locks is on the developer, which can be error-prone.
  • Semaphores are lightweight when compared to monitors, which are bloated. However, the tendency to misuse semaphores is far greater than monitors. When using a semaphore and mutex pair as an alternative to a monitor, it is easy to lock the wrong mutex or just forget to lock altogether. Even though both constructs can be used to solve the same problem, monitors provide a pre-packaged solution with less dependency on a developer’s skill to get the locking right.
  • Java monitors enforce correct locking by throwing the IllegalMonitorState exception object when methods on a condition variable are invoked without first acquiring the associated lock. The exception is in a way saying that either the object's lock/mutex was not acquired at all or that an incorrect lock was acquired.
  • A semaphore can allow several threads access to a given resource or critical section, however, only a single thread at any point in time can own the monitor and access associated resource.
  • Semaphores can be used to address the issue of missed signals, however with monitors additional state, called the predicate, needs to be maintained apart from the condition variable and the mutex which make up the monitor, to solve the issue of missed signals.

Additional comment

When should you use synchronised + wait() + notify() VS Java Concurrent Library?

The former is much simple and is the traditional approach with lesser code. But the latter enables more fine-grained control over locking and synchronisation, and allows multiple locks or conditions to be used.

Java concurrent library also offers features not available with synchronized methods, such as fairness policies (ReentrantLock), interruptible locking, try-locking with timeout.

Multithreading in Java

Atomic assignments

What operations are performed atomically by the language?

Eg, counter++ is not.

According to the Java Specifications,

  • Assignments and reads for primitive data types except for double and long are always atomic. If two threads are invoking someMethod() and passing in 5 and 7 for the integer counter variable, then the variable will hold either 5 or 7 and not any other value. There will be no partial writes of bits from either thread.
  • For double and long , we will then need to use volatile keyword.
  • All reference assignments are atomic. By reference we mean a variable holding a memory location address, where an object has been allocated by the JVM. For instance, consider the snippet Thread currentThread = Thread.currentThread(); The variable currentThread holds the address for the current thread’s object. If several threads execute the above snippet, the variable currentThread will hold one valid memory location address and not a garbage value. It can’t happen that the variable currentThread holds some bytes from the assignment operation of one thread and other bytes from the assignment operation of an another thread.

Bear in mind that atomic assignments promised by the Java platform don’t imply thread-safety! They only provide guarantees of indivisible updates to a single variable.

Thread Safety & Synchronized

A class and its public APIs are labelled as thread safe if multiple threads can consume the exposed APIs without causing race conditions or state corruption for the class.

Note that composition of two or more thread-safe classes doesn’t guarantee the resulting type to be thread-safe.

Java’s most fundamental construct for thread synchronization is the synchronized keyword. It can be used to restrict access to critical sections one thread at a time.

Each object in Java has an entity associated with it called the “monitor lock” or just monitor. Think of it as an exclusive lock. Once a thread gets hold of the monitor of an object, it has exclusive access to all the methods marked as synchronized. No other thread will be allowed to invoke a method on the object that is marked as synchronized and will block, till the first thread releases the monitor which is equivalent of the first thread exiting the synchronized method.

// TWO different locks

class Employee {

// shared variable
private String name;
private Object lock = new Object();

// method is synchronize on 'this' object
public synchronized void setName(String name) {
this.name = name;
}

// also synchronized on the same object
public synchronized void resetName() {

this.name = "";
}

// equivalent of adding synchronized in method
// definition
public String getName() {
// Using a different object to synchronize on
synchronized (lock) {
return this.name;
}
}
}

Note with the use of the synchronized keyword, Java forces you to implicitly acquire and release the monitor-lock for the object within the same method! One can’t explicitly acquire and release the monitor in different methods. This has an important ramification, the same thread will acquire and release the monitor! In contrast, if we used semaphore, we could acquire/release them in different methods or by different threads.

Wait & Notify

The wait method is exposed on each java object. Each Java object can act as a condition variable. When a thread executes the wait method, it releases the monitor for the object and is placed in the wait queue. Note that the thread must be inside a synchronized block of code that synchronizes on the same object as the one on which wait() is being called, or in other words, the thread must hold the monitor of the object on which it'll call wait. If not so, an illegalMonitor exception is raised!

Like the wait method, notify() can only be called by the thread which owns the monitor for the object on which notify() is being called else an illegal monitor exception is thrown. The notify method, will awaken one of the threads in the associated wait queue, i.e., waiting on the thread's monitor.

However, this thread will not be scheduled for execution immediately and will compete with other active threads that are trying to synchronize on the same object. The thread which executed notify will also need to give up the object’s monitor, before any one of the competing threads can acquire the monitor and proceed forward.

This method is the same as the notify() one except that it wakes up all the threads that are waiting on the object's monitor.

Interrupting Threads

When a thread wait()-s or sleep()-s then one way for it to give up waiting/sleeping is to be interrupted. If a thread is interrupted while waiting/sleeping, it’ll wake up and immediately throw Interrupted exception.

The thread class exposes the interrupt() method which can be used to interrupt a thread that is blocked in a sleep() or wait() call. Note that invoking the interrupt method only sets a flag that is polled periodically by sleep or wait to know the current thread has been interrupted and an interrupted exception should be thrown.

class Demonstration {

public static void main(String args[]) throws InterruptedException {
InterruptExample.example();
}
}

class InterruptExample {

static public void example() throws InterruptedException {

final Thread sleepyThread = new Thread(new Runnable() {

public void run() {
try {
System.out.println("I am too sleepy... Let me sleep for an hour.");
Thread.sleep(1000 * 60 * 60);
} catch (InterruptedException ie) {
System.out.println("The interrupt flag is cleard : " + Thread.interrupted() + " " + Thread.currentThread().isInterrupted());
Thread.currentThread().interrupt(); // we again interrupt the thread and
//no exception is thrown. This is to emphasize that merely calling the
//interrupt method isn't responsible for throwing the interrupted exception.
//Rather the implementation should periodically check for the interrupt
//status and take appropriate action.
System.out.println("Oh someone woke me up ! ");
System.out.println("The interrupt flag is set now : " + Thread.currentThread().isInterrupted() + " " + Thread.interrupted()); // we print the interrupt status for the thread, which is set to true because of above

}
}
});

sleepyThread.start();

System.out.println("About to wake up the sleepy thread ...");
sleepyThread.interrupt();
System.out.println("Woke up sleepy thread ...");

sleepyThread.join();
}
}

Volatile

If you have a variable say a counter that is being worked on by a thread, it is possible the thread keeps a copy of the counter variable in the CPU cache and manipulates it rather than writing to the main memory. The JVM will decide when to update the main memory with the value of the counter, even though other threads may read the value of the counter from the main memory and may end up reading a stale value.

If a variable is declared volatile then whenever a thread writes or reads to the volatile variable, the read and write always happen in the main memory. As a further guarantee, all the variables that are visible to the writing thread also get written-out to the main memory alongside the volatile variable. Similarly, all the variables visible to the reading thread alongside the volatile variable will have the latest values visible to the reading thread.

Note that volatile presents a consistent view of the memory to all the threads. However, remember that volatile doesn’t imply or mean thread-safety.

If there’s a single thread that writes to the volatile variable and other threads only read the volatile variable then just using volatile is enough, however, if there’s a possibility of multiple threads writing to the volatile variable then “synchronized” would be required to ensure atomic writes to the variable.

Reentrant Locks & Condition Variables

In Java, the traditional mutex (mutual exclusion) concept is implemented by the synchronized keyword. It allows only one thread at a time to acquire the lock and access the synchronised block or method. However, there are situations where more advanced locking mechanisms are required. Java provides a more flexible alternative called the ReentrantLock.

Synchronized is reentrant therefore if a thread is already synchroinzed on an object, it’ll be able to resynchronize on it.

A ReentrantLock is a mutual exclusion lock that allows threads to acquire the lock multiple times, hence the term “reentrant.” This means that a thread that has already acquired the lock can acquire it again without blocking itself. ReentrantLock keeps track of the number of times a thread has acquired the lock and requires the same number of releases to fully release the lock. This feature is useful in certain scenarios, such as nested synchronized blocks or recursive algorithms.

It is similar to the implicit monitor lock accessed when using synchronized methods or blocks. With the reentrant lock, you are free to lock and unlock it in different methods but not with different threads.

import java.util.concurrent.locks.ReentrantLock;


private static ReentrantLock lock = new ReentrantLock();
loc.lock();
lock.unlock();

Reentrant lock also provides a fairness policy where the lock grants access to the longest waiting thread.

private final ReentrantLock lock = new ReentrantLock(fair);

Condition Variable

In Java, the wait(), notify(), and notifyAll() methods are provided by the Object class and are used for thread synchronization. These methods allow threads to communicate and coordinate with each other by suspending and resuming their execution based on certain conditions. However, they have some limitations, such as the inability to have multiple wait-sets per object or the lack of explicit signalling.

To address these limitations, Java provides the Condition interface, which can be seen as a more flexible replacement for the traditional object monitor methods (wait(), notify(), and notifyAll()). A Condition represents a specific condition or state of an object and allows threads to wait or signal based on that condition.

  1. await(): This method is similar to wait(). It causes the current thread to wait until the condition is signalled by another thread. The thread releases the associated lock and enters a waiting state. When the condition is signalled, the thread is awakened and automatically re-acquires the lock before returning from the await() call.
  2. signal(): This method is similar to notify(). It wakes up one waiting thread that is associated with the condition. If multiple threads are waiting, the specific thread that gets signalled is not guaranteed and depends on the implementation.
  3. signalAll(): This method is similar to notifyAll(). It wakes up all waiting threads associated with the condition. All the awakened threads will compete to acquire the lock before resuming execution.

The Condition interface is typically used in conjunction with a ReentrantLock object. By obtaining a Condition instance from a ReentrantLock, you can use the await(), signal(), and signalAll() methods to implement more fine-grained and controlled thread synchronisation and communication.

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

// The Condition object represents a condition variable,
// which can be used to suspend threads and control their execution based
// on a certain condition becoming true or false.

// Threads can call the await() method on a Condition object to
// voluntarily give up the lock associated with the Lock object and enter
// a waiting state until another thread signals or interrupts them.
// The await() method should be called within a block of code protected by
// the associated lock.

// Below is an example usage
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean conditionMet = false;

public void awaitCondition() throws InterruptedException {
lock.lock();
try {
while (!conditionMet) {
condition.await(); // It uses the await() method on the Condition object to enter a waiting state, releasing the associated lock
}
// Condition is met, continue execution
} finally {
lock.unlock();
}
}

public void signalCondition() {
lock.lock();
try {
conditionMet = true;
condition.signal(); // change the state of conditionMet to true and signal any waiting threads.
} finally {
lock.unlock();
}
}
}

In summary, a Condition represents a separate object associated with a specific condition or state. It is used in combination with a Lock to provide advanced thread synchronization and coordination, allowing threads to wait and signal based on conditions.

Java’s util.concurrent package provides several classes that can be used for solving everyday concurrency problems and should always be preferred than reinventing the wheel. Its offerings include thread-safe data structures such as ConcurrentHashMap.

Missed signals

A missed signal happens when a signal is sent by a thread before the other thread starts waiting on a condition.

Missed signals are caused by using the wrong concurrency constructs. In the example below, a condition variable is used to coordinate between the signaller and the waiter thread. The condition is signalled at a time when no thread is waiting on it causing a missed signal.

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class Demonstration {

public static void main(String args[]) throws InterruptedException {
MissedSignalExample.example();
}
}

class MissedSignalExample {

public static void example() throws InterruptedException {

final ReentrantLock lock = new ReentrantLock();
final Condition condition = lock.newCondition();

Thread signaller = new Thread(new Runnable() {

public void run() {
lock.lock();
condition.signal();
System.out.println("Sent signal");
lock.unlock();
}
});

Thread waiter = new Thread(new Runnable() {

public void run() {

lock.lock();

try {
condition.await(); // await without checking any condition
System.out.println("Received signal");
} catch (InterruptedException ie) {
// handle interruption
}

lock.unlock();

}
});

signaller.start();
signaller.join();

waiter.start();
waiter.join();

System.out.println("Program Exiting.");
}
}
  1. The signaller thread is started and acquires the lock using lock.lock().
  2. The signaller thread signals the condition by calling condition.signal().
  3. The signaller thread prints "Sent signal" and releases the lock using lock.unlock().
  4. The waiter thread starts and attempts to acquire the lock using lock.lock().
  5. The waiter thread calls condition.await() to wait for a signal.
  6. However, at this point, there is a possibility for a missed signal because the signaller thread has already signaled the condition before the waiter thread had a chance to start waiting.

The above code when ran, will never print the statement Program Exiting and execution would time out.

If the waiter thread starts and executes the lock.lock() statement before the signaller thread executes condition.signal(), the signal will be received successfully. However, if the timing is such that the signaller thread executes condition.signal() before the waiter thread starts waiting, the signal may be missed. This is because there is no guarantee that the waiter thread is actively waiting when the signal is sent.

To mitigate the possibility of missed signals, a common approach is to introduce a “guard” condition, which ensures that the waiter thread waits only if the expected condition is not yet satisfied. This helps prevent missed signals by checking the condition again after the thread resumes from waiting.

class MissedSignalExample {

public static void example() throws InterruptedException {

final ReentrantLock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
final boolean[] isSignalReceived = { false };

Thread signaller = new Thread(() -> {
lock.lock();
condition.signal();
System.out.println("Sent signal");
isSignalReceived[0] = true;
lock.unlock();
});

Thread waiter = new Thread(() -> {
lock.lock();
try {
while (!isSignalReceived[0]) { // Guard condition
condition.await();
}
System.out.println("Received signal");
} catch (InterruptedException ie) {
// handle interruption
} finally {
lock.unlock();
}
});

signaller.start();
signaller.join();

waiter.start();
waiter.join();

System.out.println("Program Exiting.");
}
}

// In this updated version, a boolean variable isSignalReceived is
// introduced as a guard condition. The waiter thread checks this
// condition before waiting and continues waiting until the signal is
// received. By using the guard condition, the waiter thread ensures
// that it will not miss the signal even if it started waiting after
// the signal was sent.

// It is similar to checking if a condition has been met

Another possible fix is to use a semaphore for signalling between the two threads as shown below.

import java.util.concurrent.Semaphore;

class Demonstration {

public static void main(String args[]) throws InterruptedException {
FixedMissedSignalExample.example();
}
}

class FixedMissedSignalExample {

public static void example() throws InterruptedException {

final Semaphore semaphore = new Semaphore(1);

Thread signaller = new Thread(new Runnable() {

public void run() {
semaphore.release();
System.out.println("Sent signal");
}
});

Thread waiter = new Thread(new Runnable() {

public void run() {
try {
semaphore.acquire();
System.out.println("Received signal");
} catch (InterruptedException ie) {
// handle interruption
}
}
});

signaller.start();
signaller.join();
Thread.sleep(5000);
waiter.start();
waiter.join();

System.out.println("Program Exiting.");
}
}

Semaphore in Java

Java’s semaphore can be release()-ed or acquire()-d for signalling amongst threads. However the important call out when using semaphores is to make sure that the permits acquired should equal permits returned.

Whenever using locks or semaphores, remember to unlock or release the semaphore in a finally block.

Spurious wake ups

Spurious mean fake or false. A spurious wakeup means a thread is woken up even though no signal has been received. Spurious wake ups are a reality and are one of the reasons why the pattern for waiting on a condition variable happens in a while loop as discussed in earlier chapters. There are technical reasons beyond our current scope as to why spurious wake ups happen, but for the curious on POSIX based operating systems when a process is signalled, all its waiting threads are woken up. Below comment is a directly lifted from Java’s documentation for the wait(long timeout) method.

* A thread can also wake up without being notified, interrupted, or
* timing out, a so-called <i>spurious wakeup</i>. While this will rarely
* occur in practice, applications must guard against it by testing for
* the condition that should have caused the thread to be awakened and
* continuing to wait if the condition is not satisfied. In other words,
* waits should always occur in loops, like this one:
*
* synchronized (obj) {
* while (condition does not hold)
* obj.wait(timeout);
* ... // Perform action appropriate to condition
* }
*

Atomic Classes

So far we have seen locks that allow shared data to be manipulated safely among multiple threads. However, locking doesn’t come for free and takes a toll on performance especially when the shared data/state is being contented for access by multiple threads.

Cons of Locking

Locking comes with its downsides some of which include:

Thread scheduling vs useful work

JVM is very efficient when it comes to acquiring and releasing a lock that is requested by a single thread. However, when multiple threads attempt to acquire the same lock, only one wins and the rest must be suspended. The suspension and resumption of threads is costly and introduces significant overhead and this can be an issue for scenarios where several threads contend for the same lock but execute very little functionality. In such cases, the time spent in scheduling overhead overwhelms the useful work done. This is true of synchronized collections where the majority of methods perform very few operations.

Priority inversion

A higher priority thread can be blocked by a lower priority thread that holds the lock and itself is blocked because of a page fault, scheduling delay etc. This situation effectively downgrades the priority of the higher-priority thread to that of the lower-priority thread since the former can’t make progress until the latter releases the lock. In general, all threads that require a particular lock can’t make progress until the thread holding the lock releases it.

Liveness issues

Locking also introduces the possibility of liveness issues such as deadlocks, live lock or simply programming bugs that have threads caught in infinite loops blocking other threads from making progress.

Locking, a heavyweight mechanism

In general locking is a heavyweight mechanism, especially for fine-grained tasks such as manipulating a counter. Locking is akin to assuming the worst or preparing for the worst possible scenario, i.e. the thread assumes it would necessarily run into contention with another thread and acquires a lock to manipulate shared state. Another approach could be to update shared state hoping it would complete without contention/interference from other participants. In case contention is detected, the update operation can be failed, and if desired, reattempted later.

Atomic VS Volatile

Short of locking, we have volatile variables that promise the same visibility guarantees as a lock, however, volatile variables can’t be used for:

  • Executing compound actions (operations that involves multiple steps), .e.g. Decrementing a counter involves fetching the counter value, decrementing it and then writing the updated value for a total of three steps. In this case, volatile variables are not suitable because they only guarantee visibility for individual operations, not for compound actions.
  • When the value of a variable depends on another or the new value of a variable depends on its older value. If one thread modifies a variable while another thread is using its value, the ordering and synchronization of these operations may lead to race conditions or inconsistent results.

The above limitations are addressed by atomic classes, which offer similar memory visibility guarantees as volatile variables and also allow operations such as read-modify-write to be executed atomically.

volatile AtomicInteger atomicInteger = new AtomicInteger();

Note that the marking the AtomicInteger variable above volatile isn’t superfluous and implies that when the variable atomicInteger is updated to a new reference, the updated value atomicInteger holds will be observed by all threads that read the variable.

In the absence of volatile the atomicInteger variable’s value (which points to a memory location) may get cached by a processor and the new object the variable points to after the update, may not be visible to the processor that cached the old value.

Atomic variables can also be thought of as “better volatiles”. Atomic variables make ideal counters, sequence generators, and variables that accumulate running statistics. They offer the same memory semantics as volatile variables with additional support for atomic updates and may be better choices than volatile variables in most circumstances.

Atomic Processor Instructions

  • Compare and Swap

In general, the CAS instruction has three operands:

  1. A memory location, say M, representing the variable we desire to manipulate.
  2. An expected value for the variable, say A. This is the latest value seen for the variable.
  3. The new value, say B, which we want the variable to update to.

CAS instruction works by performing the following actions atomically:

  1. Check the latest value of the memory location M.
  2. If the memory location has a value other than A, then it implies that another thread changed the variable since the last time we examined it and therefore the requested update operation should be aborted.
  3. If the variable’s value is indeed A, then it implies that no other thread has had a chance to change the variable to a different value than A, since we last examined the variable’s value and therefore we can proceed to update the variable/memory location to the new value B.

The idiomatic usage of CAS usually takes the form of reading the value A of a shared variable, deriving a new value B from the value A, and finally invoking CAS to update the variable from A to B if it hasn’t been changed to another value in the meantime.

  • ABA Problem

CAS succeeds even if the value of a shared variable is changed from A to B and then back to A. Consider the following sequence:

  1. A thread T1 reads the value of a shared variable as A and desires to change it to B. After reading the variable’s value, thread T1 undergoes a context switch.
  2. Another thread, T2 comes along, changes the value of the shared variable from A to B and then back to A from B.
  3. Thread T1 is scheduled again for execution and invokes CAS with A as the expected value and B as the new value. CAS succeeds since the current value of the variable is A, even though it changed to B and then back to A in the time thread T1 was context switched.

For some algorithms, this is not a problem but it may be a problem for others. This problem usually occurs when a program manages its own memory rather than leaving it to the Garbage Collector.

For example, you may want to recycle the nodes in your linked list for performance reasons. Thus noting that the head of the list still references the same node, may not necessarily imply that the list wasn’t changed. One solution to this problem is to attach a version number with the value, i.e. instead of storing the value as A, we store it as a pair (A, V1). Another thread can change the value to (B, V1) but when it changes it back to A the associated version is different i.e. (A, V2). In this way, a collision can be detected. There are two classes in Java that can be used to address the ABA problem:

  1. AtomicStampedReference (please see this class in our reference section for detailed explanation of ABA problem)
  2. AtomicMarkableReference

More on Atomics

Taxonomy of atomic classes

There are a total of sixteen atomic classes divided into four groups:

  1. Scalars
  2. Field updaters
  3. Arrays
  4. Compound variables

Most well-known and commonly used are the scalar ones such as AtomicInteger, AtomicLong, AtomicReference, which support the CAS (compare-and-set). Other primitive types such as double and float can be simulated by casting short or byte values to and from int and using methods floatToIntBits() and doubleToLongBits() for floating point numbers. Atomic scalar classes extend from Number and don’t redefine hashCode() or equals().

Atomics are not primitives

Note, that the Integer class has the same hashcode for the same integer value but that’s not the case for AtomicInteger. Thus Atomic* scalar classes are unsuitable as keys for collections that rely on hashcode.

import java.util.concurrent.atomic.AtomicInteger;

class Demonstration {
public static void main( String args[] ) {
AtomicInteger atomicFive = new AtomicInteger(5);
AtomicInteger atomicAlsoFive = new AtomicInteger(5);

System.out.println("atomicFive.equals(atomicAlsoFive) : " + atomicFive.equals(atomicAlsoFive));
System.out.println("atomicFive.hashCode() == atomicAlsoFive.hashCode() : " + (atomicFive.hashCode() == atomicAlsoFive.hashCode()));


Integer integer1 = new Integer(23235);
Integer integer2 = new Integer(23235);

System.out.println("integer1.equals(integer2) : " + integer1.equals(integer2));
System.out.println("integer1.hashCode() == integer2.hashCode() : " + (integer1.hashCode() == integer2.hashCode()));
}
}

Output
atomicFive.equals(atomicAlsoFive) : false
atomicFive.hashCode() == atomicAlsoFive.hashCode() : false
integer1.equals(integer2) : true
integer1.hashCode() == integer2.hashCode() : true

Performance of atomics vs locks

In the case of a single thread, i.e. zero contention environment, an operation that relies on CAS (e.g. updating an AtomicInteger) will be faster than an operation that involves locking first.

Atomics are always preferable over lock based solutions. Even though, in a highly contended environment, the majority of the threads will waste CPU cycles retrying CAS operations but using locks in the same situation would have the threads suspended then resumed later.

Keeping state thread local

Finally, the best choice for scalability and performance is to share as little state as possible among threads. Keeping variables and state, thread local results in maximum performance and elimination of contention. Even though atomics achieve better scalability than using locks, choosing not to share any state among threads will result in the best scalability.

Non-blocking Synchronization

Non-blocking algorithms use machine-level atomic instructions such as compare-and-swap instead of locks to provide data integrity when multiple threads access shared resources.

As the name suggests, non-blocking algorithms don’t block when multiple threads contend for the same data, and as a consequence greatly reduce scheduling overhead. These algorithms don’t suffer from deadlocks, liveness issues and individual thread failures.

  • An algorithm is called non-blocking if the failure or suspension of a thread doesn’t cause the failure or suspension of another thread.
  • An algorithm is called lock free if at every step of the algorithm some thread participant of the algorithm can make progress.

Lets say we want to create a non blocking counter.

Using a thread -> So that it is thread safe

class LockBasedCounter {
private long value;

public synchronized long getValue() {
return value;
}

// When multiple threads attempt to invoke the method at the same time,
// only one is allowed to do so while the rest are suspended
public synchronized void increment() {
value++;
}
}

The alternative is to use a non blocking synchronisation where threads don’t get suspended in the face of contention. To build one, we use CAS instructions.

    public class SimulatedCAS {

// Let's assume for simplicity our value is a long
private long value = 0;

// constructor to initialize the value
public SimulatedCAS(long initValue) {
value = initValue;
}

synchronized long getValue() {
return value;
}

// The synchronized keyword causes all the steps in this method to execute
// atomically, which is akin to simulating the compare and swap processor
// instruction. The behavior of the function is as follows:
//
// 1. Return the expectedValue if the CAS instruction completes successfully, i.e.
// the newValue is written.
// 2. Return the current value if the CAS instruction doesn't complete successfully
//
// The method is setup such that when expectedValue equals the return value
// the caller can assume success.
synchronized long compareAndSwap(long expectedValue, long newValue) {

if (value == expectedValue) {
value = newValue;
return expectedValue;
}

// return whatever is the current value
return value;
}

// This method uses the compareAndSwap() method to indicate if the CAS
// instruction completed successfully or not.
synchronized boolean compareAndSet(long expectedValue, long newValue) {
return compareAndSwap(expectedValue, newValue) == expectedValue;
}
}

CAS performs better than locks. Acquiring a lock usually involves at least one CAS operation and peripheral lock-related housekeeping tasks, which implies more work is done by a lock-based counter in the best case of zero contention compared to a CAS-based counter.

There are well-known non-blocking algorithms for commonly used data structures such as hashtables, priority queues, stacks, linked-lists etc. However, designing non-blocking algorithms is much more complex than lock-based alternatives. Generally, non-blocking algorithms skirt many of the vices associated with lock-based approaches such as deadlocks or priority inversion, however, threads participating in a non-blocking algorithm can still experience starvation or livelocks as they may perform repeated retries without success.

Miscellaneous Topics

Lock Fairness

We’ll briefly touch on the topic of fairness in locks since its out of scope for this course. When locks get acquired by threads, there’s no guarantee of the order in which threads are granted access to a lock. A thread requesting lock access more frequently may be able to acquire the lock unfairly greater number of times than other locks. Java locks can be turned into fair locks by passing in the fair constructor parameter. However, fair locks exhibit lower throughput and are slower compared to their unfair counterparts.

new ReentrantLocks(true);

Thread Pools

Imagine an application that creates threads to undertake short-lived tasks. The application would incur a performance penalty for first creating hundreds of threads and then tearing down the allocated resources for each thread at the ends of its life. The general way programming frameworks solve this problem is by creating a pool of threads, which are handed out to execute each concurrent task and once completed, the thread is returned to the pool.

Java offers thread pools via its Executor Framework. The framework includes classes such as the ThreadPoolExecutor for creating thread pools.

Java Memory Model

In layman’s terms, you can think of a memory model as the a set of rules which determine when writes by one thread are visible to another thread.

Consequently the model permits the compiler, the processor or the runtime to reorder memory operations or program statements for optimization and performance reasons, as long as, write visibilities guaranteed by the model aren’t violated

…. it’s really complex..i don’t understand much in this section…gonna skip this for now…

References

Setting up threads

(1) Runnable interface


class Demonstration {
public static void main( String args[] ) {

ExecuteMe executeMe = new ExecuteMe();
Thread t = new Thread(executeMe);
t.start();
}
}

class ExecuteMe implements Runnable {

public void run() {
System.out.println("Say Hello");
}

}

(2) Subclassing Thread class

class Demonstration {
public static void main( String args[] ) throws Exception {
ExecuteMe executeMe = new ExecuteMe();
executeMe.start();
executeMe.join();

}
}

class ExecuteMe extends Thread {

@Override
public void run() {
System.out.println("I ran after extending Thread class");
}

}

Implementing Runnable is preferred because java supports implementing multiple interfaces. If you extend Thread class, you can’t extend any other classes.

Basic Thread handling

Joining Threads

A thread is always created by another thread except for the main application thread. You may wonder what happens to the innerThread if the main thread finishes execution before the innerThread is done?

class Demonstration {
public static void main( String args[] ) throws InterruptedException {

ExecuteMe executeMe = new ExecuteMe();
Thread innerThread = new Thread(executeMe);
innerThread.setDaemon(true);
innerThread.start();
}
}

class ExecuteMe implements Runnable {

public void run() {
while (true) {
System.out.println("Say Hello over and over again.");
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
// swallow interrupted exception
}
}
}
}

If you execute the above code, you’ll see no output. That is because the main thread exits right after starting the innerThread. Once it exits, the JVM also kills the spawned thread. On line 6 we mark the innerThread thread as a daemon thread, which we’ll talk about shortly, and is responsible for innerThread being killed as soon as the main thread completes execution.

If we want the main thread to wait for the innerThread to finish before proceeding forward, we can direct the main thread to suspend its execution by calling join method on the innerThread object right after we start the innerThread. The change would look like the following.

 Thread innerThread = new Thread(executeMe);
innerThread.start();
innerThread.join();

Daemon Threads

A daemon thread runs in the background but as soon as the main application thread exits, all daemon threads are killed by the JVM. A thread can be marked daemon as follows:

innerThread.setDaemon(true);

Note that in case a spawned thread isn’t marked as a daemon then even if the main thread finishes execution, JVM will wait for the spawned thread to finish before tearing down the process.

Sleeping Threads

A thread can be made dormant for a specified period using the sleep method. However, be wary to not use sleep as a means for coordination among threads.

class SleepThreadExample {
public static void main( String args[] ) throws Exception {
ExecuteMe executeMe = new ExecuteMe();
Thread innerThread = new Thread(executeMe);
innerThread.start();
innerThread.join();
System.out.println("Main thread exiting.");
}
static class ExecuteMe implements Runnable {

public void run() {
System.out.println("Hello. innerThread going to sleep");
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
// swallow interrupted exception
}
}
}
}

Interrupt Threads

class HelloWorld {
public static void main( String args[] ) throws InterruptedException {
ExecuteMe executeMe = new ExecuteMe();
Thread innerThread = new Thread(executeMe);
innerThread.start();

// Interrupt innerThread after waiting for 5 seconds
System.out.println("Main thread sleeping at " + +System.currentTimeMillis() / 1000);
Thread.sleep(5000);
innerThread.interrupt();
System.out.println("Main thread exiting at " + +System.currentTimeMillis() / 1000);
}

static class ExecuteMe implements Runnable {

public void run() {
try {
// sleep for a thousand minutes
System.out.println("innerThread goes to sleep at " + System.currentTimeMillis() / 1000);
Thread.sleep(1000 * 1000);
} catch (InterruptedException ie) {
System.out.println("innerThread interrupted at " + +System.currentTimeMillis() / 1000);
}
}
}

}

Executor Framework

Used for thread housekeeping. Recommended way in production environment.

A task is a logical unit of work. Usually, a task should be independent of other tasks so that it can be completed by a single thread. A task can be represented by an object of a class implementing the Runnable interface. We can consider HTTP requests being fielded by a web-server as tasks that need to be processed. A database server handling client queries can similarly be thought of as independent tasks.

In Java, the primary abstraction for executing logical tasks units is the Executor framework and not the Thread class. The classes in the Executor framework separate out:

  • Task Submission
  • Task Execution

The framework allows us to specify different policies for task execution. Java offers three interfaces, which classes can implement to manage thread lifecycle. These are:

The Executor interface forms the basis for the asynchronous task execution framework in Java.

You don’t need to create your own executor class as Java’s java.util.concurrent package offers several types of executors that are suitable for different scenarios. However, as an example, we create a dumb executor which implements the Executor Interface.

import java.util.concurrent.Executor;
class ThreadExecutorExample {

public static void main( String args[] ) {
DumbExecutor myExecutor = new DumbExecutor();
MyTask myTask = new MyTask();
myExecutor.execute(myTask);
}

static class DumbExecutor implements Executor {
// Takes in a runnable interface object
public void execute(Runnable runnable) {
Thread newThread = new Thread(runnable);
newThread.start();
}
}

static class MyTask implements Runnable {
public void run() {
System.out.println("Mytask is running now ...");
}
}

}

The Executor requires implementing classes to define a method execute(Runnable runnable) which takes in an object of interface Runnable. Fortunately, we don't need to define complex executors as Java already provides several that we'll explore in following chapters.

Executor Implementations

Executors are based on consumer-producer patterns. The tasks we produce for processing are consumed by threads. To better our understanding of how threads behave, imagine you are hired by a hedge fund on Wall Street and you are asked to design a method that can process client purchase orders as soon as possible. Let’s see what are the possible ways to design this method.

  1. sequential … (no concurrency)
  2. Create a new thread to receive each new request…(Bad…end up creating many many threads)
  3. Thread pools (Ideal way)

Thread Pools

Thread pools in Java are implementations of the Executor interface or any of its sub-interfaces. Thread pools allow us to decouple task submission and execution.

A thread pool consists of homogenous worker threads that are assigned to execute tasks. Once a worker thread finishes a task, it is returned to the pool. Usually, thread pools are bound to a queue from which tasks are dequeued for execution by worker threads.

  • There’s no latency when a request is received and processed by a thread because no time is lost in creating a thread.
  • The system will not go out of memory because threads are not created without any limits.
  • Fine tuning the thread pool will allow us to control the throughput of the system. We can have enough threads to keep all processors busy but not so many as to overwhelm the system.
  • The application will degrade gracefully if the system is under load.
import java.util.concurrent.Executors;

void receiveAndExecuteClientOrdersBest() {

int expectedConcurrentOrders = 100;
Executor executor = Executors.newFixedThreadPool(expectedConcurrentOrders);

while (true) {
final Order order = waitForNextOrder();

executor.execute(new Runnable() {

public void run() {
order.execute();
}
});
}
}

Types of Thread Pools

Java has preconfigured thread pool implementations that can be instantiated using the factory methods of the Executors class. The important ones are listed below:

  • newFixedThreadPool: This type of pool has a fixed number of threads and any number of tasks can be submitted for execution. Once a thead finishes a task, it can reused to execute another task from the queue. -> newFixedThreadPool(5);
  • newSingleThreadExecutor: This executor uses a single worker thread to take tasks off of queue and execute them. If the thread dies unexpectedly, then the executor will replace it with a new one. -> .newSingleThreadExecutor()
  • newCachedThreadPool: This pool will create new threads as required and use older ones when they become available. However, it’ll terminate threads that remain idle for a certain configurable period of time to conserve memory. This pool can be a good choice for short-lived asynchronous tasks. -> creates a thread pool that can dynamically adjust its size based on the demands of the submitted tasks. It does not have a fixed number of threads -> Executors.newCachedThreadPool();
  • newScheduledThreadPool: This pool can be used to execute tasks periodically or after a delay. Executors.newScheduledThreadPool(3);
  • There is also another kind of pool which we’ll only mention in passing as it’s not widely used: ForkJoinPool. A prefconfigured version of it can be instantiated using the factory method Executors.newWorkStealingPool(). These pools are used for tasks which fork into smaller subtasks and then join results once the subtasks are finished to give an uber result. It's essentially the divide and conquer paradigm applied to tasks.

An executor has the following stages in its life-cycle:

  • Running
  • Shutting Down
  • Terminated

As mentioned earlier, JVM can’t exit unless all non-daemon thread have terminated. Executors can be made to shutdown either abruptly or gracefully. When doing the former, the executor attempts to cancel all tasks in progress and doesn’t work on any enqueued ones, whereas when doing the latter, the executor gives a chance for tasks already in execution to complete but also completes the enqueued tasks. If shutdown is initiated then the executor will refuse to accept new tasks and if any are submitted, they can be handled by providing a RejectedExecutionHandler.

ThreadPoolExecutor

Generally the use of ThreadPoolExecutor is discouraged in favor of thread pools that can be instantiated using the Executor factory methods. These thread pools come with pre-configured settings that are commonly used in most scenarios, however, the ThreadPoolExecutor comes with several knobs and parameters that can be fine-tuned to suit unusual use-cases.

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

The arguments corePoolSize and the maximumPoolSize together
determine the number of threads that get created in the pool.
When the pool has less than corePoolSize threads and a new task arrives,
a new thread is instantiated even if other threads in the pool are idle.
When the pool has more than corePoolSize threads but less than
maximumPoolSize threads then a new thread is only created if the
queue that holds the submitted tasks is full.
The maximum number of threads that can be created is capped by
maximumPoolSize.

Note that a newly instantiated pool creates core threads only when
tasks start arriving in the queue. However, this behavior can be tweaked
by invoking one of the prestartCoreThread() or prestartAllCoreThreads()
methods, which is a good idea when creating a pool with a non-empty queue.
import java.util.concurrent.*;

class Demonstration {
public static void main( String args[] ) throws InterruptedException {

// create an instance of the ThreadPoolExecutor
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 5, 1,
TimeUnit.MINUTES, new LinkedBlockingDeque<>(3), new ThreadPoolExecutor.AbortPolicy());

try {
// submit six tasks
for (int i = 0; i < 6; i++) {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
System.out.println("This is worker thread " + Thread.currentThread().getName() + " executing");
try {
// simulate work by sleeping for 1 second
Thread.sleep(1000);
} catch (InterruptedException ie) {
// ignore for now
}
}
});
}

} finally {
threadPoolExecutor.shutdown();
}
}
}

Queuing strategy -> The choice of queue we pass in determines the queuing strategy for the executor.

The strategies are

  1. Direct Handoffs -> An object running in one thread syncs up with an object running in another thread to hand off a piece of information, event or task. SynchronousQueue class can be used for this. Here, an item can only be inserted in the queue if another thread is simultaneously removing it.
import java.util.concurrent.*;

class Demonstration {
public static void main( String args[] ) throws InterruptedException {
SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();

// The following statement blocks the main thread as there is no corresponding
// thread to dequeue the item being placed in the synchronous queue.
synchronousQueue.put(7);
}
}

If a SynchronousQueue is passed to the ThreadPoolExecutor, it means whenever a new task is submitted to the executor, it will be handed off by the queue to one of the pool threads for execution. (The queue does not hold any task).

However, if the tasks submitted exceeds the maximumPollSize, the queue does not hold any tasks and no free threads are available, then the submitted tasks are rejected.

If we set maximumPoolSize to infinity, then threads can grow indefinitely.

2. Unbounded queues -> LinkedBlockingQueue with no predefined capacity and the queue can arbitrarily grow in size. The consequence is that tasks get added to the queue if all the corePoolSize threads are busy. the maximumPoolSize setting takes no effect and only corePoolSize threads are ever created. Here, the queue size can keep growing indefinitely.

import java.util.concurrent.*;

class Demonstration {
public static void main( String args[] ) throws InterruptedException {
// create a ThreadPoolExecutor with a LinkedBlockingDeque to implement the unbounded queue strategy. The pool has
// a maximum of 5 threads. Since we aren't passing-in the RejectionHandler, the default AbortPolicy will be used.
// Note that the maximumPoolSize setting doesn't have any effect since only corePoolSize threads are ever created
// because the queue has indefinite (theoretically) capacity.
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 1,
TimeUnit.MINUTES, new LinkedBlockingDeque<>());

int i = 0;
try {

// Try to submit 20 tasks
for (; i < 20; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// simulate work by sleeping for 1 second
System.out.println("Thread " + Thread.currentThread().getName() + " at work.");
Thread.sleep(1000);
} catch (InterruptedException ie) {
// ignore for now
}
}
});
}
} catch (RejectedExecutionException ree) {
// Let's see which task gets rejected
System.out.println("Task " + (i + 1) + " rejected.");
} finally {
// don't forget to shutdown the executor
threadPoolExecutor.shutdown();

// wait for the executor to shutdown
threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS);
}
}
}

Of course, we can also define the size of the LinkedBlockingQueue…new LinkedBlockingQueue(5).

3. Bounded queue

Need to considr queue size and number of threads….tradeoff..

Task rejection

There are four different policies that can be supplied to the executor to determine the course of action when tasks can’t be accepted any more. These policies are represented by four classes that extend the RejectedExecutionHandler class. The executor invokes the rejectedExecution() method of the supplied RejectedExecutionHandler when a task is intended for rejection.

Callable interface

In the previous sections we used the Runnable interface as the abstraction for tasks that were submitted to the executor service. The Runnable interface's sole run method doesn't return a value, which is a handicap for tasks that don't want to write results to global or shared datastructures. The interface Callable allows such tasks to return results.

public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

Note the interface also allows a task to throw an exception. A task goes through the various stages of its life which include the following:

  • created
  • submitted
  • started
  • completed
    class SumTask implements Callable<Integer> {

int n;

public SumTask(int n) {
this.n = n;
}

public Integer call() throws Exception {

if (n <= 0)
return 0;

int sum = 0;
for (int i = 1; i <= n; i++) {
sum += i;
}

return sum;
}
}



final int n = 10
Callable<Integer> sumTask = new Callable<Integer>() {

public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= n; i++)
sum += i;
return sum;
}
};

Future Interface

The Future interface is used to represent the result of an asynchronous computation. The interface also provides methods to check the status of a submitted task and also allows the task to be cancelled if possible.

Here is how you could use it

import java.util.concurrent.Callable;

public class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
// Define the task to be executed by the thread
// ...
return 42; // Return the result of the task
}
}

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(1);
MyCallable myCallable = new MyCallable();
Future<Integer> future = executor.submit(myCallable);

// Perform other operations while the task is executing...

Integer result = future.get(); // Retrieve the result of the task
System.out.println("Result: " + result);

executor.shutdown(); // Shutdown the executor service
}
}

..............another example..........


import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class Demonstration {

// Create and initialize a threadpool
static ExecutorService threadPool = Executors.newFixedThreadPool(2);

public static void main( String args[] ) throws Exception {
System.out.println( "sum :" + findSum(10));
threadPool.shutdown();
}

static int findSum(final int n) throws ExecutionException, InterruptedException {

Callable<Integer> sumTask = new Callable<Integer>() {

public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= n; i++)
sum += i;
return sum;
}
};

Future<Integer> f = threadPool.submit(sumTask);
return f.get();
}


}

Thread pools implementing the ExecutorService return a future for their task submission methods. In the above code on line 29 we get back a future when submitting our task. We retrieve the result of the task by invoking the get method on the future. The get method will return the result or throw an instance of ExecutionException.

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


class Demonstration {

static ExecutorService threadPool = Executors.newFixedThreadPool(2);

public static void main( String args[] ) throws Exception {
System.out.println( " sum: " + findSumWithException(10));
threadPool.shutdown();
}

static int findSumWithException(final int n) throws ExecutionException, InterruptedException {

int result = -1;

Callable<Integer> sumTask = new Callable<Integer>() {

public Integer call() throws Exception {
throw new RuntimeException("something bad happened.");
}
};

Future<Integer> f = threadPool.submit(sumTask);

try {
result = f.get();
} catch (ExecutionException ee) {
System.out.println("Something went wrong. " + ee.getCause());
}

return result;
}

}

The get method is a blocking call which blocks till the task completes.

We can also poll if a job has completed or cancel a task.


Future<Integer> f1 = threadPool.submit(sumTask1);
Future<Void> f2 = threadPool.submit(randomTask);
// Poll for completion of first task
try {

// Before we poll for completion of second task,
// cancel the second one
f2.cancel(true);

// Polling the future to check the status of the
// first submitted task
while (!f1.isDone()) {
System.out.println("Waiting for first task to complete.");
}
result = f1.get();
} catch (ExecutionException ee) {
System.out.println("Something went wrong.");
}

Syntax

  • If you need to retrieve the result of the task or handle any exceptions thrown by the task, use submit().
  • If you have a simple task without a return value or exceptions to handle, and you don’t need to interact with the task’s result, use execute() for simplicity.

FutureTask

Java also provides an implementation of the future interface called the FutureTask.

It can wrap a callable or runnable object and in turn be submitted to an executor.

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

class Demonstration {

@SuppressWarnings("unchecked")
public static void main( String args[] ) throws Exception{

FutureTask<Integer> futureTask = new FutureTask(new Callable() {

public Object call() throws Exception {
try{
Thread.sleep(1);
}
catch(InterruptedException ie){
// swallow exception
}
return 5;
}
});

ExecutorService threadPool = Executors.newSingleThreadExecutor();
Future duplicateFuture = threadPool.submit(futureTask);

// Awful idea to busy wait
while (!futureTask.isDone()) {
System.out.println("Waiting");
}

if(duplicateFuture.isDone() != futureTask.isDone()){
System.out.println("This should never happen.");
}

System.out.println((int)futureTask.get());

threadPool.shutdown();
}
}

Why will we want to wrap a task and feed it to the thread pool? The return value of threadpool.submit is already a Future object.

Well, FutureTask provides the following benefits

  1. explicit control over task execution. you can manually evoke the execution by calling run() method on the futuretask instance.
  2. it allows you to use advanced functionality of future without relying on executor
  3. it has cancellation features
  4. Allows customisation and extension.

CompletionService Interface

In the previous lesson we discussed how tasks can be submitted to executors but imagine a scenario where you want to submit hundreds or thousands of tasks. You’ll retrieve the future objects returned from the submit calls and then poll all of them in a loop to check which one is done and then take appropriate action. Java offers a better way to address this use case through the CompletionService interface.

You can use the ExecutorCompletionService as a concrete implementation of the interface.

The completion service is a combination of a blocking queue and an executor. Tasks are submitted to the queue and then the queue can be polled for completed tasks. The service exposes two methods, one poll which returns null if no task is completed or none were submitted and two take which blocks till a completed task is available.

import java.util.Random;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


class Demonstration {

static Random random = new Random(System.currentTimeMillis());

public static void main( String args[] ) throws Exception {
completionServiceExample();
}


static void completionServiceExample() throws Exception {

class TrivialTask implements Runnable {

int n;

public TrivialTask(int n) {
this.n = n;
}

public void run() {
try {
// sleep for one second
Thread.sleep(random.nextInt(101));
System.out.println(n*n);
} catch (InterruptedException ie) {
// swallow exception
}
}
}

ExecutorService threadPool = Executors.newFixedThreadPool(3);
ExecutorCompletionService<Integer> service =
new ExecutorCompletionService<Integer>(threadPool);

// Submit 10 trivial tasks.
for (int i = 0; i < 10; i++) {
service.submit(new TrivialTask(i), new Integer(i));
}

// wait for all tasks to get done
int count = 10;
while (count != 0) {
Future<Integer> f = service.poll();
if (f != null) {
System.out.println("Thread" + f.get() + " got done.");
count--;
}
}

threadPool.shutdown();
}

}

InvokeAll VS CompletionService

invokeAll() and CompletionService are two different mechanisms in Java for handling the results of multiple tasks, but they serve different purposes and have different usage patterns.

invokeAll():

invokeAll() is a method provided by the ExecutorService interface.
It takes a collection of tasks (represented by Callable or Runnable objects) as input.
It submits all the tasks for execution and waits for all of them to complete.
The return value is a list of Future objects representing the results of the tasks in the order they were provided.
The invokeAll() method blocks until all tasks have completed or the timeout (if specified) has elapsed.

Callable<String> task1 = () -> {
// Task 1 execution
// ...
return "Task 1 result";
};

Callable<String> task2 = () -> {
// Task 2 execution
// ...
return "Task 2 result";
};

List<Callable<String>> tasks = Arrays.asList(task1, task2, task3);
List<Future<String>> results = executor.invokeAll(tasks);
for (Future<String> result : results) {
String taskResult = result.get();
// Process the task result
}

CompletionService:

CompletionService is an interface that helps manage asynchronous task execution and retrieve results in the order of completion.
It is typically used in scenarios where you have multiple tasks and you want to process the results as soon as they become available, rather than waiting for all tasks to complete.
It decouples the submission of tasks from the retrieval of results.
You submit tasks to the CompletionService, and it returns a Future representing the result of each task as soon as it is completed.
You can use the take() or poll() methods of the CompletionService to retrieve the completed Future objects and process the results.

CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
completionService.submit(task1);
completionService.submit(task2);
completionService.submit(task3);
for (int i = 0; i < 3; i++) {
Future<String> completedTask = completionService.take();
String taskResult = completedTask.get();
// Process the task result
}

In summary, invokeAll() is used when you want to submit a collection of tasks as a batch, wait for all tasks to complete, and retrieve their results in a specific order. On the other hand, CompletionService is used when you want to process the results of tasks as soon as they become available, regardless of the order in which they were submitted.

Concurrent Collections

To cater for multithreaded scenarios, the framework provided static methods to wrap vanilla collections in thread-safe wrapper objects. These thread-safe wrapper objects came to be known as wrapper collections.

Design pattern: Decorator pattern

 ArrayList<Integer> myList = new ArrayList<>();
List<Integer> syncList = Collections.synchronizedList(myList);

In Java5, concurrent collections were provided..it is much better than synchronised wrapper collections. This is primarily because their throughput is not reduced by the need to serialize access, as is the case with synchronized collections. Synchronized collections also suffer from the overhead of managing locks, which can be high if there is much contention.

concurrent collections vs wrapper collections

The concurrent collections use a variety of ways to achieve thread-safety while avoiding traditional synchronization for better performance.

  • Copy on Write: Concurrent collections utilizing this scheme are suitable for read-heavy use cases. An immutable copy is created of the backing collection and whenever a write operation is attempted, the copy is discarded and a new copy with the change is created. Reads of the collection don’t require any synchronization, though synchronization is needed briefly when the new array is being created. Examples include CopyOnWriteArrayList and CopyOnWriteArraySet
  • Compare and Swap: Consider a computation in which the value of a single variable is used as input to a long-running calculation whose eventual result is used to update the variable. Traditional synchronization makes the whole computation atomic, excluding any other thread from concurrently accessing the variable. This reduces opportunities for parallel execution and hurts throughput. An algorithm based on CAS behaves differently: it makes a local copy of the variable and performs the calculation without getting exclusive access. Only when it is ready to update the variable does it call CAS, which in one atomic operation compares the variable’s value with its value at the start and, if they are the same, updates it with the new value. If they are not the same, the variable must have been modified by another thread; in this situation, the CAS thread can try the whole computation again using the new value, or give up, or — in some algorithms — continue, because the interference will have actually done its work for it! Collections using CAS include ConcurrentLinkedQueue and ConcurrentSkipListMap.
  • Lock: Some collection classes use Lock to divide up the collection into multiple parts that can be locked separately resulting in improved concurrency. For example, LinkedBlockingQueue has separate locks for the head and tail ends of the queue, so that elements can be added and removed in parallel. Other collections using these locks include ConcurrentHashMap and most of the implementations of BlockingQueue.

ConcurrentHashMap

HashMaps and Concurrency

CocurrentHashMap is a thread-safe class and multiple threads can operate on it in parallel without incurring any of the issues that a HashMap may suffer from in a concurrent environment. For write operations the entire map is never locked rather only a segment of the map is locked. However, the retrieval or read operations generally don’t involve locking at all. So in case of a read, the value set for a key by the most recently completed update operation is returned i.e. a completed update operation on a given key bears a happens before relationship with any (non-null) read operation. This does mean that a stale value may be returned if an update operation is in progress but not yet completed.

Since read operations can happen while update operations are on-going, any concurrent reads during the execution of aggregate operations such as putAll() or clear() may return insertion or removal of some of the entries respectively, when all or none are expected.

  • null can’t be inserted either as a key or a value.
  • The ConcurrentHashMap shards its data into segments and the segments are locked individually when being written to. Each segment can be written independently of other segments allowing multiple threads to operate on the map object.
  • The reads happen without locking for the majority of cases, thus making them synchronization-free and improving performance. However, note that there are certain minority scenarios when reads have to go through synchronization.
  • In general, using keys that evaluate to the same hashCode will slow down the performance of any hash map.

One of the follies assumed when working with ConcurrentHashMap is to think that any accesses of and operations on the key/values within the data structure are somehow magically thread-safe. The map doesn’t protect against external race conditions. Consider the program in the widget below that has two threads increment a key’s value in a ConcurrentHashMap by a hundred times each. Run the program multiple times and see each run prints a different result.

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class Demonstration {

public static void main( String args[] ) throws Exception {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("Biden", 0);

ExecutorService es = Executors.newFixedThreadPool(5);

// create a task to increment the vote count
Runnable task = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++)
map.put("Biden", map.get("Biden") + 1);
}
};

// submit the task twice
Future future1 = es.submit(task);
Future future2 = es.submit(task);

// wait for the threads to finish
future1.get();
future2.get();

// shutdown the executor service
es.shutdown();

System.out.println("votes for Biden = " + map.get("Biden"));
}
}

One of the ways to fix the above program is to use instance of the AtomicInteger class as value. We’ll invoke the incrementAndGet() method to register an increment on the value.

import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

class Demonstration {

public static void main( String args[] ) throws Exception {
ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();
// create an atomic integer to keep the vote count
AtomicInteger ai = new AtomicInteger(0);
map.put("Biden", ai);

ExecutorService es = Executors.newFixedThreadPool(5);

// create a task to increment the vote count
Runnable task = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++)
// We are ignoring the returned updated value from the
// function call
map.get("Biden").incrementAndGet();
}
};

// submit the task twice
Future future1 = es.submit(task);
Future future2 = es.submit(task);

// wait for the threads to finish
future1.get();
future2.get();

// shutdown the executor service
es.shutdown();

System.out.println("votes for Biden = " + map.get("Biden").get());
}
}

Output is now 200.

Another questions

if we use AtomicInteger as value with the HashMap class would our program output the correct result? The answer is yes for this naive/simple program because the atomic integer itself is thread-safe so multiple threads attempting to increment it do so serially. However, the data structure i.e. the hash map itself is thread-unsafe and can exhibit concurrency bugs when multiple threads operate on it, traverse its keys or values, or when the map resizes. Remember, we have to think about concurrency both at the map level and at the key/value level.

Another alternative is to use a custom counter class

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class Demonstration {

// Class to keep track of vote count
static class MyCounter {
private int count = 0;

void increment() {
count++;
}

int getCount() {
return count;
}
}

public static void main( String args[] ) throws Exception {
ConcurrentHashMap<String, MyCounter> map = new ConcurrentHashMap<>();
map.put("Biden", new MyCounter());

ExecutorService es = Executors.newFixedThreadPool(5);

// create a task to increment the vote count
Runnable task = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
MyCounter mc = map.get("Biden");

// explicit synchronization
synchronized (mc) {
mc.increment();
}
}
}
};

// submit the task twice
Future future1 = es.submit(task);
Future future2 = es.submit(task);

// wait for the threads to finish
future1.get();
future2.get();

// shutdown the executor service
es.shutdown();

System.out.println("votes for Biden = " + map.get("Biden").getCount());
}
}

HashMap VS HashTable VS ConcurrentHashMap vs Collections.synchronizedMap(...)??

As already discussed HashMap isn’t thread-safe and if your scenario doesn’t involve multiple threads, pick HashMap as it will provide the best performance.

Hashtable is a synchronized map i.e. it can only be accessed by a single reader or writer thread at a time. Hashtable performs poorly in a highly concurrent environment. Iterators on a Hashtable observe fail fast behavior i.e. the iterators throw a ConcurrentModificationException if the Hashtable is structurally modified at any time after the iterator is created, in any way except through the iterator’s own remove method. Hashtable is part of Java’s legacy code and was retrofitted. to implement the Map interface so that it could become part of the Java’s collections framework.

Collections.synchronizedMap(...) creates an instance of the SynchronizedMap class (a private class in Collections class) backed by the passed-in map. The returned object synchronizes access to the backing map allowing a single thread to operate on the map at a time. For example:

Map<String, Integer> map = Collections.synchronizedMap(new HashMap<>());

The user must synchronize manually on the returned map object (not the backing map) when traversing any of the collection views using iterators, split iterators or streams.

A difference between HashTable and Collections.synchronizedMap(...) is that in case of HashTable all the methods are synchronized on the map object itself i.e. the this object. In case. of Collections.synchronizedMap(...) a separate object (not the backing map) is used for synchronization. But essentially both HashTable and Collections.synchronizedMap(...) have similar synchronization behavior. Note that a synchronized map backed by a HashTable simply adds a redundant synchronization layer and should never be used.

Finally, the ConcurrentHashMap uses sophisticated techniques to reduce the need for synchronization when multiple threads access the map in parallel. It is highly scalable and concurrent and can be iterated upon without requiring synchronization.

ConcurrentModificationException

The name ConcurrentModificationException may sound related to concurrency, however, the exception can be thrown while a single thread operates on a map.

       while (it.hasNext()) {
// Add a new key/value pair while the map is
// being traversed.
map.put("key-" + i, i);
it.next();
i++;
}

It is not only the HashMap that suffers from ConcurrentModificationException, other maps exhibit same behavior. The only map that is designed to be concurrently modified while being traversed is the ConcurrentHashMap

Even though the ConcurrentHashaMap can undergo concurrent modifications (additions, deletions, updates) at the same time as its elements are being traversed, the modifications may not be reflected during the traversal.

As a user of ConcurrentHashMap one has to be cognizant of the limitation of iterators/enumerators, which may return a snapshot of the map taken at the time of creation of the iterator/enumeration or later.

Lock Interface

The Lock interface has the following classes implementing it:

  1. ReentrantLock
  2. ReentrantReadWriteLock.ReadLock
  3. ReentrantReadWriteLock.WriteLock
  • locks offer additional functionality and far more flexibility in usage than synchronized methods and statements.
        Lock ourLock = // ... instantiate a lock

ourLock.lock();
try {
// ... Perform operations
} finally {
ourLock.unlock();
}

But devs need to remember to unlock it.

Finally, since Lock implementation is also an object it can be used as the argument to synchronized statement but such use is discouraged other than for internal implementation of the class. For example, the following is a bad practice:

Lock ourLock = // ... instantiate a lock

synchronized(ourLock){
// ... Not a good idea
}

ReentrantLock

The ReentrantLock implements the Lock interface and is functionally similar to the implicit monitor lock accessed using synchronized methods and statements.

The lock is said to be owned by the thread that locks it and any other thread attempting to lock the object will block. A thread that already owns the lock will return immediately if it invokes lock again. The reentrant behavior of the lock allows recursively locking by the already owning thread, however, the lock supports a maximum of 2147483647 locks by the same thread.

The ReentrantLock can also be operated in fair mode where the lock is granted to the longest waiting thread. Thus no thread experiences starvation.

ReadWriteLock Interface

The lock is intended to allow multiple readers to read at a time but only allow a single writer to write.

The only implementing class for the interface is ReentrantReadWriteLock

The ReentrantReadWriteLock can be locked by multiple readers at the same time while writer threads have to wait.

Conversely, the ReentrantReadWriteLock can be locked by a single writer thread at a time and other writer or reader threads have to wait for the lock to be free.

The ReentrantReadWriteLock as the name implies allows threads to recursively acquire the lock.

Good for if we perform many reads than writes.

Cache example

One common scenario where there can be multiple readers and writers is that of a cache. A cache is usually used to speed up read requests from another data source e.g. data from hard disk is cached in memory so that a request doesn’t have to wait for data to be fetched from the hard disk thus saving I/O. Usually, there are multiple readers trying to read from the cache and it is imperative that the readers don’t step over writers or vice versa.

In the simple case we can relax the condition that readers are ok to read stale data from the cache. We can imagine that a single writer thread periodically writes to the cache and readers don’t mind if the data gets stale before the next update by the writer thread. In this scenario, the only caution to exercise is to make sure no readers are reading the cache when a writer is in the process of writing to the cache.

import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class Demonstration {

static Random random = new Random();

public static void main( String args[] ) throws Exception {

ExecutorService es = Executors.newFixedThreadPool(15);

// cache
HashMap<String, Object> cache = new HashMap<>();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

// put some data in the cache
cache.put("key", -1);

Runnable writerTask = new Runnable() {
@Override
public void run() {
writerThread(cache, lock);
}
};

Runnable readerTask = new Runnable() {
@Override
public void run() {
readerThread(cache, lock);
}
};

try {
// submit tasks for execution
Future future1 = es.submit(writerTask);
Future future2 = es.submit(readerTask);
Future future3 = es.submit(readerTask);
Future future4 = es.submit(readerTask);
Future future5 = es.submit(readerTask);

// wait for tasks to finish
future1.get();
future2.get();
future3.get();
future4.get();
future5.get();
} finally {
es.shutdown();
}


}

static void writerThread(HashMap<String, Object> cache, ReadWriteLock lock) {

for (int i = 0; i < 9; i++) {
try {
Thread.sleep(random.nextInt(50));
} catch (InterruptedException ie) {
// ignore
}

lock.writeLock().lock();
try {
System.out.println("Acquired write lock");
cache.put("key", random.nextInt(1000));
} finally {
lock.writeLock().unlock();
}
}
}

static void readerThread(HashMap<String, Object> cache, ReadWriteLock lock) {

for (int i = 0; i < 3; i++) {
try {
Thread.sleep(random.nextInt(100));
} catch (InterruptedException ie) {
// ignore
}

lock.readLock().lock();
try {
System.out.println("Acquire read lock and reading key = " + cache.get("key"));
} finally {
lock.readLock().unlock();
}
}
}
}

StampedLock

It provides an alternative to traditional lock mechanisms like ReentrantLock and ReadWriteLock, with the added feature of optimistic read locking.

StampedLock supports three types of locks:

  1. Read Lock: Multiple threads can acquire the read lock simultaneously as long as no thread holds the write lock. Read locks are compatible with each other but not with the write lock.
  2. Write Lock: Only one thread can hold the write lock, and it excludes all other read and write locks.
  3. Optimistic Read Lock: This is a special type of lock that allows multiple threads to acquire a lock for reading, even if another thread holds the write lock. However, it does not guarantee the consistency of the data read, as another thread may have modified it. The optimistic read lock is acquired by calling the tryOptimisticRead() method, which returns a stamp. After acquiring an optimistic read lock, you can perform read operations. However, you must validate the lock's validity by calling the validate() method using the acquired stamp to ensure that no write lock was acquired during the read operation.

StampedLock uses a concept called "stamps" to represent different lock states. A stamp is an opaque long value returned by lock acquisition methods. By inspecting the value of the stamp, you can determine the lock status and perform conditional operations.

The optimistic read lock avoids the overhead of acquiring and releasing read locks.

import java.util.concurrent.locks.*;

public class StampedLockExample {
private double x, y;
private final StampedLock lock = new StampedLock();

public void writeData(double newX, double newY) {
long stamp = lock.writeLock();
try {
x = newX;
y = newY;
} finally {
lock.unlockWrite(stamp);
}
}

public double readDistanceFromOrigin() {
long stamp = lock.tryOptimisticRead();
double currentX = x;
double currentY = y;
if (!lock.validate(stamp)) { // ensure no write lock was acquired
stamp = lock.readLock();
try {
currentX = x;
currentY = y;
} finally {
lock.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
long stamp = stampedLock.writeLock();
stampedLock.unlock(stamp);

Others

Cyclic Barrier

A CyclicBarrier is a reusable construct where a group of threads waits together until all of the threads arrive. At that point, the barrier is broken and an action can optionally be taken.

Countdown Latch

A CountDownLatch is a synchronization mechanism in Java that allows one or more threads to wait until a set of operations or tasks completes. It is initialized with a count, and each time a task completes, the count is decremented. When the count reaches zero, all threads waiting for it to become zero are released.

import java.util.concurrent.CountDownLatch;

public class CountdownLatchExample {
public static void main(String[] args) throws InterruptedException {
int numberOfTasks = 3;
CountDownLatch latch = new CountDownLatch(numberOfTasks);

// Create and start worker threads
for (int i = 1; i <= numberOfTasks; i++) {
Thread workerThread = new Thread(new Worker(i, latch));
workerThread.start();
}

// Wait for all tasks to complete
latch.await();

// All tasks completed
System.out.println("All tasks completed!");
}
}

class Worker implements Runnable {
private final int id;
private final CountDownLatch latch;

public Worker(int id, CountDownLatch latch) {
this.id = id;
this.latch = latch;
}

@Override
public void run() {
System.out.println("Worker " + id + " started");
try {
// Simulating some work
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Worker " + id + " completed");
latch.countDown(); // Decrease the count by 1
}
}

In this example, we create a CountDownLatch with a count of 3. We then create three worker threads (Worker objects) and start them. Each worker thread performs some work and calls countDown() on the latch, which decrements the count by 1. Finally, the main thread calls await() on the latch, which causes it to block until the count becomes zero.

When all the worker threads have completed their tasks and called countDown() three times, the count becomes zero, and the main thread is released from the await() call. The program then prints "All tasks completed!".

That’s all for this article!

Note that it does not contain everything in the Educative course. I only copied some of the stuff which I felt were more important …

--

--

LiveRunGrow

🌸 🌹 🪴 Dreamer. Creator. Software Engineer. Follow me to be entertained by my random sharing of my learnings, readings and personal reflections.

Recommended from Medium

Lists

See more recommendations