1
19
20 package com.liferay.portal.kernel.messaging;
21
22 import com.liferay.portal.kernel.log.Log;
23 import com.liferay.portal.kernel.log.LogFactoryUtil;
24 import com.liferay.portal.kernel.util.ConcurrentHashSet;
25
26 import java.util.Iterator;
27 import java.util.Set;
28 import java.util.concurrent.ThreadPoolExecutor;
29
30
38 public abstract class IteratorDispatcherDestination extends BaseDestination {
39
40 public IteratorDispatcherDestination(String name) {
41 super(name);
42 }
43
44 public IteratorDispatcherDestination(
45 String name, int workersCoreSize, int workersMaxSize) {
46
47 super(name, workersCoreSize, workersMaxSize);
48 }
49
50 public void register(MessageListener listener) {
51 listener = new InvokerMessageListener(listener);
52
53 _listeners.add(listener);
54
55 setListenerCount(_listeners.size());
56 }
57
58 public void send(Message message) {
59 if (_listeners.size() == 0) {
60 if (_log.isDebugEnabled()) {
61 _log.debug("No listeners for destination " + getName());
62 }
63
64 return;
65 }
66
67 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
68
69 if (threadPoolExecutor.isShutdown()) {
70 throw new IllegalStateException(
71 "Destination " + getName() + " is shutdown and cannot " +
72 "receive more messages");
73 }
74
75 dispatch(_listeners.iterator(), message);
76 }
77
78 public boolean unregister(MessageListener listener) {
79 listener = new InvokerMessageListener(listener);
80
81 boolean value = _listeners.remove(listener);
82
83 setListenerCount(_listeners.size());
84
85 return value;
86 }
87
88 protected abstract void dispatch(
89 Iterator<MessageListener> listenersItr, Message message);
90
91 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
92
93 private Set<MessageListener> _listeners =
94 new ConcurrentHashSet<MessageListener>();
95
96 }