BlockingQueue in Java

Blocking Queue

A blocking queue is a queue that blocks when you try to dequeue an empty queue, or if you try to enqueue an item in a full queue. Dequeue means when you take an item out of a queue. Enqueue means when you put an item into a queue.

A thread trying to dequeue an empty queue is blocked until some other thread(s) insert(s) an item into the queue.

A thread trying to enqueue an item in a full queue is blocked until some other threads make space available in the queue, either by dequeuing one or more items or clearing the queue completely.

It means a queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

A BlockingQueue may be capacity bounded. At any given time it may have a remainingCapacity beyond which no additional elements can be put without blocking.

A BlockingQueue without any intrinsic capacity constraints always reports a remaining capacity of Integer.MAX_VALUE.

A BlockingQueue does not intrinsically support any kind of “close” or “shutdown” operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.

Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a BlockingQueue happen-before actions subsequent to the access or removal of that element from the BlockingQueue in another thread.

Does BlockingQueue accept null element ?

A BlockingQueue does not accept null elements. Implementations throw NullPointerException on attempts to add, put or offer a null. A null is used as a sentinel value to indicate failure of poll operations.

Are BlockingQueue implementations thread-safe ?

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control. However, the bulk Collection operations addAll, containsAll, retainAll and removeAll are not necessarily performed atomically unless specified otherwise in an implementation.

So it is possible, for example, for addAll(c) to fail (throwing an exception) after adding only some of the elements in c.

Why BlockingQueue ?

BlockingQueue implementations are designed to be used primarily for producer-consumer queues, but additionally support the Collection interface.

So, for example, it is possible to remove an arbitrary element from a queue using remove(x).

However, such operations are in general not performed very efficiently, and are intended for only occasional use, such as when a queued message is cancelled.

The main advantage is that a BlockingQueue provides a correct, thread-safe implementation. The “blocking” nature of the queue has a couple of advantages.

First, on adding elements, if the queue capacity is limited, memory consumption is limited as well.

Also, if the queue consumers get too far behind producers, the producers are naturally throttled since they have to wait to add elements.

When taking elements from the queue, the main advantage is simplicity; waiting forever is trivial, and correctly waiting for a specified time-out is only a little more complicated.

More information can be found at the Oracle’s Documentation.

Usage example, based on a typical producer-consumer scenario. Note that a BlockingQueue can safely be used with multiple producers and multiple consumers.

Producer

Create below Producer class which will produce item and put it into the queue in 1000 milliseconds interval.

public class Producer implements Runnable {

    private final BlockingQueue<Item> queue;

    public Producer(BlockingQueue<Item> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                String uuid = UUID.randomUUID().toString();
                
                System.out.println("Produced Item Id : " + uuid);
                
                Item item = new Item(uuid);
                queue.put(item);
                
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

The purpose of the producer is to produce an item which is put into the queue.

Consumer

Create below Consumer class which will consume item from queue.

public class Consumer implements Runnable {

    private final BlockingQueue<Item> queue;

    public Consumer(BlockingQueue<Item> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Item item = queue.take();
                
                System.out.println("Consumed Item Id : " + item.getId());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

The purpose of a consumer is to consume an item from the queue.

POJO Class

The Item class is a soimple POJO (Plain Old Java Object) class which is given below.

public class Item {

    private String id;

    public Item(String id) {
        this.id = id;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

}

Main Class

Create the producer-consumer service class to test the LinkedBlockingQueue. The LinkedBlockingQueue class is used to maintain the order of the produced and consumed items. The consumer will get the item in the same order produced by the producer.

public class BlockingQueueApp {

    public static void main(String[] args) {
        BlockingQueue<Item> queue = new LinkedBlockingQueue<Item>();

        Producer producer = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer1).start();
        new Thread(consumer2).start();
    }

}

Testing the Blocking Queue

Running the above main class will give you the following output:

Produced Item Id : c35c4c8f-f533-4b30-b216-acfc4be7c4a1
Consumed Item Id : c35c4c8f-f533-4b30-b216-acfc4be7c4a1
Produced Item Id : 7d62f77f-fac0-493c-8a6e-06319b13933f
Consumed Item Id : 7d62f77f-fac0-493c-8a6e-06319b13933f
Produced Item Id : 9727fef4-aee6-4d10-9cfd-0826545f0c32
Consumed Item Id : 9727fef4-aee6-4d10-9cfd-0826545f0c32
Produced Item Id : 119edd81-abbb-40b5-a148-4a13d22c6acc
Consumed Item Id : 119edd81-abbb-40b5-a148-4a13d22c6acc
Produced Item Id : e725a909-080f-453e-81db-b1ad7d618e5c
Consumed Item Id : e725a909-080f-453e-81db-b1ad7d618e5c
Produced Item Id : edf027f1-af71-4a52-8e58-7e197371369b
Consumed Item Id : edf027f1-af71-4a52-8e58-7e197371369b
Produced Item Id : 98c02377-05ca-4fa1-934e-fe8b2ff225fa
Consumed Item Id : 98c02377-05ca-4fa1-934e-fe8b2ff225fa
...more

Notice the consumer get the items in the same order produced by the producer.

Source Code

Download

Leave a Reply

Your email address will not be published.