java-technotes

Sunday, October 11, 2015

Producer consumer problem using wait and notify

We have a shared Queue and two threads called Producer and Consumer. Producer thread puts number into shared queue and Consumer thread consumes numbers from shared bucket. Condition is that once an item is produced, consumer thread has to be notified and similarly after consumption producer thread needs to be notified. This inter thread communication is achieved using wait and notify method. Remember wait and notify method is defined in object class, and they are must be called inside synchronized block.

package concurrency;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.log4j.Logger;

public class InterThreadCommunicationExample {

    public static void main(String args[]) {

        final Queue sharedQ = new LinkedList();

        Thread producer = new Producer(sharedQ);
        Thread consumer = new Consumer(sharedQ);

        producer.start();
        consumer.start();

    }
}

public class Producer extends Thread {
    private static final Logger logger = Logger.getLogger(Producer.class);
    private final Queue sharedQ;

    public Producer(Queue sharedQ) {
        super("Producer");
        this.sharedQ = sharedQ;
    }

    @Override
    public void run() {

        for (int i = 0; i < 4; i++) {

            synchronized (sharedQ) {
                //waiting condition - wait until Queue is not empty
                while (sharedQ.size() >= 1) {
                    try {
                        logger.debug("Queue is full, waiting");
                        sharedQ.wait();
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                logger.debug("producing : " + i);
                sharedQ.add(i);
                sharedQ.notify();
            }
        }
    }
}

public class Consumer extends Thread {
    private static final Logger logger = Logger.getLogger(Consumer.class);
    private final Queue sharedQ;

    public Consumer(Queue sharedQ) {
        super("Consumer");
        this.sharedQ = sharedQ;
    }

    @Override
    public void run() {
        while(true) {

            synchronized (sharedQ) {
                //waiting condition - wait until Queue is not empty
                while (sharedQ.size() == 0) {
                    try {
                        logger.debug("Queue is empty, waiting");
                        sharedQ.wait();
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                int number = sharedQ.poll();
                logger.debug("consuming : " + number );
                sharedQ.notify();
              
                //termination condition
                if(number == 3){break; }
            }
        }
    }
}


Producer Consumer Design Pattern with Blocking Queue

BlockingQueue amazingly simplifies implementation of Producer-Consumer design pattern by providing out of the box support of blocking on put() and take(). Developer doesn't need to write confusing and critical piece of wait-notify code to implement communication. BlockingQuue is an interface and Java 5 provides different implantation like ArrayBlockingQueue and LinkedBlockingQueue , both implement FIFO order or elements, while ArrayLinkedQueue is bounded in nature LinkedBlockingQueue is optionally bounded. here is a complete code example of Producer Consumer pattern with BlockingQueue. Compare it with classic wait notify code, its much simpler and easy to understand.


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

    public static void main(String args[]){
  
     //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();
 
     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));

     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
    }
 
}

//Producer Class in java
class Producer implements Runnable {

    private final BlockingQueue sharedQueue;

    public Producer(BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}

//Consumer Class in Java
class Consumer implements Runnable{

    private final BlockingQueue sharedQueue;

    public Consumer (BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }
  
    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
  
  
}

Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9

You see Producer Thread  produced number and Consumer thread consumes it in FIFO order because blocking queue allows elements to be accessed in FIFO.


Sorting objects by multiple attributes (Java Multilevel Sorting using Apache Collections Framework)

The best way to do this is by using the ComparatorChain from the Apache Collections Framework.
You have to implement for every attribute a Comparator that you add to the Chain in the order you need. Now you can use the chain like a normal Comparator.


Here is how it looks like in code:

 package org.test.comparator;  
 public class Person {  
      public Person(String name, int age) {  
           this.name = name;  
           this.age = age;  
      }  
      public String name;  
      public Integer age;  
      @Override  
      public String toString() {  
           return "[" + name + "|" + age + "]";  
      }  
 }  

 package org.test.comparator;  
   
 import java.util.ArrayList;  
 import java.util.Collections;  
 import java.util.Comparator;  
 import java.util.List;  
   
 import org.apache.commons.collections.comparators.ComparatorChain;  
   
 public class TestComparatorChain {  
      public static void main(String[] args) {  
           List<Person> persons = new ArrayList<Person>();  
           persons.add(new Person("stan", 31));  
           persons.add(new Person("kyle", 22));  
           persons.add(new Person("stan", 11));  
           persons.add(new Person("kyle", 30));  
             
           Comparator<Person> comparatorName = new Comparator<Person>() {  
                @Override  
                public int compare(Person o1, Person o2) {  
                     return o1.name.compareToIgnoreCase(o2.name);  
                }  
           };  
             
           Comparator<Person> comparatorAge = new Comparator<Person>() {  
                @Override  
                public int compare(Person o1, Person o2) {  
                     return o1.age.compareTo(o2.age);  
                }  
           };  
             
           ComparatorChain chain = new ComparatorChain();  
           chain.addComparator(comparatorName);  
           chain.addComparator(comparatorAge);  
             
           System.out.println(persons);  
             
           Collections.sort(persons, chain);  
             
           System.out.println(persons);  
      }  
 }  

Resulting two lines:
[[stan|31], [kyle|22], [stan|11], [kyle|30]]
[[kyle|22], [kyle|30], [stan|11], [stan|31]]

The commons-collections-3.2.1.jar is with 575 kb quite big if you only want to use one class of it.