Pages

Monday, May 23, 2011

Threaded Queue Example - ConcurrentLinkedQueue

We were giving interviews to some prospective developers for our team at work. One of the questions/examples we asked for was to create a multi-threaded application which has a producer thread and a consumer thread which passes messages between them.

After a couple of really bad attempts by prospects, I decided to give it a try. I have not done a lot of multi-thread applications (standalone) since most of my work has to do with applications in containers. I wondered if I could do it. Well my results are in the included application. It is just one way of doing it, but I thought I would post the cleaned-up version.

The NetBeans project can be downloaded from here: ThreadedQueueExample.zip

ThreadedQueueExample.java


/*
 * Copyright 2011 Blue Lotus Software, LLC.
 * Copyright 2011 John Yeary
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
/*
 * $Id: ThreadedQueueExample.java 347 2011-05-23 12:22:29Z jyeary $
 */
package com.bluelotussoftware.thread.example;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 *
 * @author John Yeary
 * @version 1.0
 */
public class ThreadedQueueExample {

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {

        Queue<Message> queue = new ConcurrentLinkedQueue<Message>();

        Producer p = new Producer(queue);
        Consumer c = new Consumer(queue);

        Thread t1 = new Thread(p);
        Thread t2 = new Thread(c);


        t1.start();
        t2.start();
    }
}

Producer.java


/*
 * Copyright 2011 Blue Lotus Software, LLC.
 * Copyright 2011 John Yeary
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
/*
 * $Id: Producer.java 347 2011-05-23 12:22:29Z jyeary $
 */
package com.bluelotussoftware.thread.example;

import java.util.Queue;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * @author John Yeary
 * @version 1.0
 */
public class Producer implements Runnable {

    private static int ctr;
    private final Queue<Message> messageQueue;
    private final Random r;

    public Producer(Queue<Message> messageQueue) {
        this.messageQueue = messageQueue;
        r = new Random();
    }

    @Override
    public void run() {
        while (true) {
            produce();
            int wait = r.nextInt(5000);
            try {
                Thread.sleep(wait);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    private void produce() {
        Message m = new Message(++ctr, "Example message.");
        messageQueue.offer(m);
        synchronized (messageQueue) {
            messageQueue.notifyAll();
        }
        System.out.println("Producer: " + m);
    }
}

Consumer.java


/*
 * Copyright 2011 Blue Lotus Software, LLC.
 * Copyright 2011 John Yeary
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
/*
 * $Id: Consumer.java 349 2011-05-23 15:25:32Z jyeary $
 */
package com.bluelotussoftware.thread.example;

import java.util.Queue;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * @author John Yeary
 * @version 1.0
 */
public class Consumer implements Runnable {

    private final Queue<Message> queue;

    public Consumer(Queue<Message> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            consume();
            try {
                synchronized (queue) {
                    queue.wait();
                }
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    private void consume() {
        while (!queue.isEmpty()) {
            Message m = queue.poll();
            if (m != null) {
                System.out.println("Consumer: " + m.toString());
            }
        }
    }
}

Message.java


/*
 * Copyright 2011 Blue Lotus Software, LLC.
 * Copyright 2011 John Yeary
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
/*
 * $Id: Message.java 347 2011-05-23 12:22:29Z jyeary $
 */
package com.bluelotussoftware.thread.example;

/**
 *
 * @author John Yeary
 * @version 1.0
 */
public class Message {

    private int id;
    private String message;

    public Message(int id, String message) {
        this.id = id;
        this.message = message;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    @Override
    public String toString() {
        return "Message{ id:" + id + ",message: " + message + "}";
    }
}


Enhanced by Zemanta

4 comments:

  1. ConcurrentLinkedQueue - a class I had not used before. Thanks for sharing!

    ReplyDelete
  2. I had a correction from a colleague at work to check to see if the Consumer got a null message before calling toString() on it. Nice catch, and an example of the power of free software and more eyes.

    ReplyDelete
  3. Why are you sychronizing on the queue at all? A ConcurrentLinkedQueue by design allows threads to concurrently access it without synchronization by using a 'no-wait' algorithm incorporating sophisticated techniques. Read the Javadoc or better yet, Brian Goetz's Java Concurrency in Practice for a good explanation.

    ReplyDelete
  4. An interesting observation.

    The current implementation does use a thread safe queue, but not all implementations of Queue<T> are thread safe.

    The example is for general threads not specifically the use of ConcurrentLinkedQueue<T>.

    If you look at LinkedList<T> you will notice that the API suggests you use Collections.synchronizedList(List<T> list), or external synchronization methods like the one in this example.

    I will concede that this is not the optimal use of ConcurrentLinkedQueue<T> since it is already thread safe. You could remove the synchronization blocks.

    ReplyDelete