CSC 225 Programming Project 7 – Consumer/Producer in a fair and unfair ArrayBlockingQueue

The ConsumerProducer.java class included in this folder illustrates a solution to the Consumer/Producer problem, where a Producer task writes data to a queue while a Consumer task simultaneously reads it. The example code uses a lock with two conditions to synchronize the two threads as they interact with a LinkedList.

For this project, create a producer task and a consumer task that write and read data to/from an ArrayBlockingQueue with capacity of 10. This type of queue is thread-safe, meaning that you do not need to use a lock to synchronize your threads. The producer should write 1000 pieces of data to the queue (eg, Integers 1 to 1000, or Character ‘x’ 1000 times). After each datum is written, determine how many elements are in the queue. After the tasks terminate, the program displays how often the queue was each possible size. The program should do this for a “fair” ArrayBlockingQueue and an “unfair” one.

Sample Output:

(un)fair ABQ:

SizeFrequency

089

175

295

3102

480

592

690

781

8109

996

1091

(un)fair ABQ:

Size Frequency

0259

190

26

310

43

55

630

746

850

996

10405

import java.util.concurrent.*;

import java.util.concurrent.locks.*;

import java.util.*;

public class ConsumerProducer{

private static Buffer buffer = new Buffer();

public static void main(String[] args){

ExecutorService executor = Executors.newFixedThreadPool(2);

executor.execute(new ProducerTask());

executor.execute(new ConsumerTask());

executor.shutdown();

}

private static class ProducerTask implements Runnable {

public void run() {

try {

inti = 1;

while (i < 1000) {

System.out.println("Producer writes " + i);

buffer.write(i++);

Thread.sleep((int)Math.random() * 10000);

}

}

catch (InterruptedException ex) {

ex.printStackTrace();

}

}

}

private static class ConsumerTask implements Runnable {

public void run(){

try{

while (true) {

System.out.println("\t\t\t\t\t\t\t\t\t\tConsumer reads " + buffer.read());

Thread.sleep((int)Math.random() * 10000);

}

}

catch (InterruptedException ex) {

ex.printStackTrace();

}

}

}

private static class Buffer {

private static final int CAPACITY = 1; //buffer size

private LinkedList<Integer> queue = new LinkedList>();

private static Lock lock = new ReentrantLock();

private static Condition notEmpty = lock.newCondition();

private static Condition notFull = lock.newCondition();

public void write(int value) {

lock.lock();

try {

while (queue.size() == CAPACITY) {

System.out.println("Wait for notFull condition");

notFull.await();

}

queue.offer(value);

notEmpty.signal();

}

catch (InterruptedException ex) {

ex.printStackTrace();

}

finally {

lock.unlock();

}

}

public int read(){

int value = 0;

lock.lock();

try {

while (queue.isEmpty()){

System.out.println("\t\t\t\t\t\t\t\t\t\tWait for notEmpty condition");

notEmpty.await();

}

value = queue.remove();

notFull.signal();

}

catch (InterruptedException ex) {

ex.printStackTrace();

}

finally {

lock.unlock();

return value;

}

}

}

}