After a couple of really bad attempts by prospects, I decided to give it a try. I have not done a lot of multi-thread applications (standalone) since most of my work has to do with applications in containers. I wondered if I could do it. Well my results are in the included application. It is just one way of doing it, but I thought I would post the cleaned-up version.
The NetBeans project can be downloaded from here: ThreadedQueueExample.zip
ThreadedQueueExample.java
/* * Copyright 2011 Blue Lotus Software, LLC. * Copyright 2011 John Yeary * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * $Id: ThreadedQueueExample.java 347 2011-05-23 12:22:29Z jyeary $ */ package com.bluelotussoftware.thread.example; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /** * * @author John Yeary * @version 1.0 */ public class ThreadedQueueExample { /** * @param args the command line arguments */ public static void main(String[] args) { Queue<Message> queue = new ConcurrentLinkedQueue<Message>(); Producer p = new Producer(queue); Consumer c = new Consumer(queue); Thread t1 = new Thread(p); Thread t2 = new Thread(c); t1.start(); t2.start(); } }
Producer.java
/* * Copyright 2011 Blue Lotus Software, LLC. * Copyright 2011 John Yeary * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * $Id: Producer.java 347 2011-05-23 12:22:29Z jyeary $ */ package com.bluelotussoftware.thread.example; import java.util.Queue; import java.util.Random; import java.util.logging.Level; import java.util.logging.Logger; /** * * @author John Yeary * @version 1.0 */ public class Producer implements Runnable { private static int ctr; private final Queue<Message> messageQueue; private final Random r; public Producer(Queue<Message> messageQueue) { this.messageQueue = messageQueue; r = new Random(); } @Override public void run() { while (true) { produce(); int wait = r.nextInt(5000); try { Thread.sleep(wait); } catch (InterruptedException ex) { Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex); } } } private void produce() { Message m = new Message(++ctr, "Example message."); messageQueue.offer(m); synchronized (messageQueue) { messageQueue.notifyAll(); } System.out.println("Producer: " + m); } }
Consumer.java
/* * Copyright 2011 Blue Lotus Software, LLC. * Copyright 2011 John Yeary * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * $Id: Consumer.java 349 2011-05-23 15:25:32Z jyeary $ */ package com.bluelotussoftware.thread.example; import java.util.Queue; import java.util.logging.Level; import java.util.logging.Logger; /** * * @author John Yeary * @version 1.0 */ public class Consumer implements Runnable { private final Queue<Message> queue; public Consumer(Queue<Message> queue) { this.queue = queue; } @Override public void run() { while (true) { consume(); try { synchronized (queue) { queue.wait(); } } catch (InterruptedException ex) { Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex); } } } private void consume() { while (!queue.isEmpty()) { Message m = queue.poll(); if (m != null) { System.out.println("Consumer: " + m.toString()); } } } }
Message.java
/* * Copyright 2011 Blue Lotus Software, LLC. * Copyright 2011 John Yeary * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * $Id: Message.java 347 2011-05-23 12:22:29Z jyeary $ */ package com.bluelotussoftware.thread.example; /** * * @author John Yeary * @version 1.0 */ public class Message { private int id; private String message; public Message(int id, String message) { this.id = id; this.message = message; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } @Override public String toString() { return "Message{ id:" + id + ",message: " + message + "}"; } }
ConcurrentLinkedQueue - a class I had not used before. Thanks for sharing!
ReplyDeleteI had a correction from a colleague at work to check to see if the Consumer got a null message before calling toString() on it. Nice catch, and an example of the power of free software and more eyes.
ReplyDeleteWhy are you sychronizing on the queue at all? A ConcurrentLinkedQueue by design allows threads to concurrently access it without synchronization by using a 'no-wait' algorithm incorporating sophisticated techniques. Read the Javadoc or better yet, Brian Goetz's Java Concurrency in Practice for a good explanation.
ReplyDeleteAn interesting observation.
ReplyDeleteThe current implementation does use a thread safe queue, but not all implementations of Queue<T> are thread safe.
The example is for general threads not specifically the use of ConcurrentLinkedQueue<T>.
If you look at LinkedList<T> you will notice that the API suggests you use Collections.synchronizedList(List<T> list), or external synchronization methods like the one in this example.
I will concede that this is not the optimal use of ConcurrentLinkedQueue<T> since it is already thread safe. You could remove the synchronization blocks.