Click here to Skip to main content
15,868,141 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
I have written a publisher-subscriber system for a multi-threaded project I am working on. It is my understanding that when using SynchronizedList structural operations are supposed to be thread-safe, but iteration requires that the list be wrapped in a synchronized block.

For these reasons, I can't figure out why I'm getting a comod exception here. It appears that I have taken all the necessary precautions to prevent this: all iterations are in synchronized blocks, and the list itself is private so it can't be accessed from outside the class.

Relevant code is below:
Java
/**
 * Intermediary which receives messages and distributes them to subscribers
 * that have expressed interest in those messages.
 * @author Kevin J. Burns
 *
 */
public class Publisher {
	private List<subscription> subscriptions = 
			Collections.synchronizedList(new ArrayList<>());
	private static final Publisher instance = new Publisher();
	
	private Publisher() {
		// Does nothing special except prevent instantiation.
	}
	
	/**
	 * Registers a subscription to a subset of messages.
	 * @param sub An object that can receive a message when sent
	 * @param type RTTI info for the message type of interest.
	 * @param filter A filter to use to determine whether a future message
	 * matches this subscription 
	 * @return a reference to the subscription, in case subscribers wish to
	 * keep track of their subscriptions
	 */
	public Subscription subscribe(Subscriber sub, 
			Class type, Predicate<message> filter) {
		/*
		 * Implementation note:
		 * While it would be good to make the generic clause more specific
		 * (thereby reducing type-checking code in filters), I'm not sure how 
		 * to instantiate generics from RTTI info.
		 */
		Subscription ret = new Subscription();
		ret.subscriber = sub;
		ret.messageType = type;
		ret.filter = filter;
		
		System.out.println("Subscribing " + sub + " to " + type.toString());
		synchronized (this.subscriptions) {
			this.subscriptions.add(ret);
		}
		return ret;
	}
	
	/**
	 * Removes a subscription.
	 * @param s The subscription to remove.
	 */
	public void unsubscribe(Subscription s) {
		System.out.println("Unsubscribing " + s.subscriber + " from " + s.messageType.toString());
		synchronized (subscriptions) {
			this.subscriptions.remove(s);
		}
	}
	
	/**
	 * Causes a message to be published to appropriate subscribers
	 * @param m the message to publish
	 */
	public void publish(Message msg) {
		System.out.println("Publishing" + msg.toString());
		synchronized (subscriptions) {
			Iterator<subscription> it = subscriptions.iterator();
			
			while (it.hasNext()) {
				Subscription sub = it.next();
				if (sub.test(msg)) {
					sub.subscriber.receiveMessage(msg);
				}
			}
		}
	}
	
	/**
	 * Cancels all subscriptions belonging to a particular subscriber. If this
	 * is used, the subscriber does not have to keep up with subscriptions.
	 * @param sub Subscriber to remove
	 */
	public void cancelAllSubscriptions(Subscriber sub) {
		System.out.println("Unsubscribing " + sub.toString());
		ArrayList<subscription> toRemove = new ArrayList<>();
		
		synchronized(this.subscriptions) {
			Iterator<subscription> it = subscriptions.iterator();
			
			while (it.hasNext()) {
				Subscription s = it.next();
				if (s.subscriber == sub) toRemove.add(s);
			}
			this.subscriptions.removeAll(toRemove);
		}
	}
}


What I have tried:

I've tried removing the synchronized blocks on the list modification methods; I have consulted here, Oracle's documentation on synchronized collections, and even putting a volatile 'busy' variable in that hangs the thread until busy == false, then sets busy to true before processing and to false when finished (that just ended up making one of my threads hang, and it's not a good solution anyway). I've toyed with the idea of some kind of task queue, but before I go to that trouble I thought I would ask the community whether there is something obvious that I'm missing.

Thank you in advance for your help.
Posted
Updated 20-Oct-16 5:08am
Comments
Richard MacCutchan 20-Oct-16 12:53pm    
All your code looks OK, but I cannot actually test it. I can only suggest you use the debugger to try and capture more information when the exception occurs.
Henrik Jonsson 20-Oct-16 15:18pm    
Check the stack trace to see whether you end up in a call to remove or add while you are iterating from the same thread.
kjburns1980 20-Oct-16 16:56pm    
Thank you for your help. Henrik, the problem was occurring on a lambda on a task queue (the code path is pretty straightforward and I wasn't able to find anything suspicious), so it's entirely possible that there may be a problem with the task queue.

I found CopyOnWriteArrayList, and it's only good they say for situations where iteration constitutes 90% of activity on the list. For this purpose, it's at about 65%, which I can live with for now. Plus, if the problem is on the task queue, this will at least take care of things for now so I can move on.

Thank you both for your answers.

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900