CSC 225 Programming Project 7 – Consumer/Producer in a fair and unfair ArrayBlockingQueue
The ConsumerProducer.java class included in this folder illustrates a solution to the Consumer/Producer problem, where a Producer task writes data to a queue while a Consumer task simultaneously reads it. The example code uses a lock with two conditions to synchronize the two threads as they interact with a LinkedList.
For this project, create a producer task and a consumer task that write and read data to/from an ArrayBlockingQueue with capacity of 10. This type of queue is thread-safe, meaning that you do not need to use a lock to synchronize your threads. The producer should write 1000 pieces of data to the queue (eg, Integers 1 to 1000, or Character ‘x’ 1000 times). After each datum is written, determine how many elements are in the queue. After the tasks terminate, the program displays how often the queue was each possible size. The program should do this for a “fair” ArrayBlockingQueue and an “unfair” one.
Sample Output:
(un)fair ABQ:
SizeFrequency
089
175
295
3102
480
592
690
781
8109
996
1091
(un)fair ABQ:
Size Frequency
0259
190
26
310
43
55
630
746
850
996
10405
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.*;
public class ConsumerProducer{
private static Buffer buffer = new Buffer();
public static void main(String[] args){
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new ProducerTask());
executor.execute(new ConsumerTask());
executor.shutdown();
}
private static class ProducerTask implements Runnable {
public void run() {
try {
inti = 1;
while (i < 1000) {
System.out.println("Producer writes " + i);
buffer.write(i++);
Thread.sleep((int)Math.random() * 10000);
}
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
private static class ConsumerTask implements Runnable {
public void run(){
try{
while (true) {
System.out.println("\t\t\t\t\t\t\t\t\t\tConsumer reads " + buffer.read());
Thread.sleep((int)Math.random() * 10000);
}
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
private static class Buffer {
private static final int CAPACITY = 1; //buffer size
private LinkedList<Integer> queue = new LinkedList>();
private static Lock lock = new ReentrantLock();
private static Condition notEmpty = lock.newCondition();
private static Condition notFull = lock.newCondition();
public void write(int value) {
lock.lock();
try {
while (queue.size() == CAPACITY) {
System.out.println("Wait for notFull condition");
notFull.await();
}
queue.offer(value);
notEmpty.signal();
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
finally {
lock.unlock();
}
}
public int read(){
int value = 0;
lock.lock();
try {
while (queue.isEmpty()){
System.out.println("\t\t\t\t\t\t\t\t\t\tWait for notEmpty condition");
notEmpty.await();
}
value = queue.remove();
notFull.signal();
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
finally {
lock.unlock();
return value;
}
}
}
}