I have written a publisher-subscriber system for a multi-threaded project I am working on. It is my understanding that when using SynchronizedList structural operations are supposed to be thread-safe, but iteration requires that the list be wrapped in a
synchronized
block.
For these reasons, I can't figure out why I'm getting a comod exception here. It appears that I have taken all the necessary precautions to prevent this: all iterations are in synchronized blocks, and the list itself is private so it can't be accessed from outside the class.
Relevant code is below:
public class Publisher {
private List<subscription> subscriptions =
Collections.synchronizedList(new ArrayList<>());
private static final Publisher instance = new Publisher();
private Publisher() {
}
public Subscription subscribe(Subscriber sub,
Class type, Predicate<message> filter) {
Subscription ret = new Subscription();
ret.subscriber = sub;
ret.messageType = type;
ret.filter = filter;
System.out.println("Subscribing " + sub + " to " + type.toString());
synchronized (this.subscriptions) {
this.subscriptions.add(ret);
}
return ret;
}
public void unsubscribe(Subscription s) {
System.out.println("Unsubscribing " + s.subscriber + " from " + s.messageType.toString());
synchronized (subscriptions) {
this.subscriptions.remove(s);
}
}
public void publish(Message msg) {
System.out.println("Publishing" + msg.toString());
synchronized (subscriptions) {
Iterator<subscription> it = subscriptions.iterator();
while (it.hasNext()) {
Subscription sub = it.next();
if (sub.test(msg)) {
sub.subscriber.receiveMessage(msg);
}
}
}
}
public void cancelAllSubscriptions(Subscriber sub) {
System.out.println("Unsubscribing " + sub.toString());
ArrayList<subscription> toRemove = new ArrayList<>();
synchronized(this.subscriptions) {
Iterator<subscription> it = subscriptions.iterator();
while (it.hasNext()) {
Subscription s = it.next();
if (s.subscriber == sub) toRemove.add(s);
}
this.subscriptions.removeAll(toRemove);
}
}
}
What I have tried:
I've tried removing the synchronized blocks on the list modification methods; I have consulted
here, Oracle's documentation on synchronized collections, and even putting a volatile 'busy' variable in that hangs the thread until busy == false, then sets busy to true before processing and to false when finished (that just ended up making one of my threads hang, and it's not a good solution anyway). I've toyed with the idea of some kind of task queue, but before I go to that trouble I thought I would ask the community whether there is something obvious that I'm missing.
Thank you in advance for your help.