1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.ConcurrentHashSet;
28 import com.liferay.portal.kernel.util.NamedThreadFactory;
29 import com.liferay.portal.kernel.util.StringPool;
30 import com.liferay.portal.kernel.util.Validator;
31
32 import java.util.List;
33 import java.util.Set;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37
38
43 public abstract class BaseDestination implements Destination {
44
45 public BaseDestination() {
46 }
47
48
51 public BaseDestination(String name) {
52 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
53 }
54
55
58 public BaseDestination(
59 String name, int workersCoreSize, int workersMaxSize) {
60
61 _name = name;
62 _workersCoreSize = workersCoreSize;
63 _workersMaxSize = workersMaxSize;
64
65 open();
66 }
67
68 public void addDestinationEventListener(
69 DestinationEventListener destinationEventListener) {
70
71 _destinationEventListeners.add(destinationEventListener);
72 }
73
74 public void afterPropertiesSet() {
75 if (Validator.isNull(_name)) {
76 throw new IllegalArgumentException("Name is null");
77 }
78
79 open();
80 }
81
82 public synchronized void close() {
83 close(false);
84 }
85
86 public synchronized void close(boolean force) {
87 doClose(force);
88 }
89
90 public void copyDestinationEventListeners(Destination destination) {
91 for (DestinationEventListener destinationEventListener :
92 _destinationEventListeners) {
93
94 destination.addDestinationEventListener(
95 destinationEventListener);
96 }
97 }
98
99 public void copyMessageListeners(Destination destination) {
100 for (MessageListener messageListener : _messageListeners) {
101 InvokerMessageListener invokerMessageListener =
102 (InvokerMessageListener)messageListener;
103
104 destination.register(
105 invokerMessageListener.getMessageListener(),
106 invokerMessageListener.getClassLoader());
107 }
108 }
109
110 public DestinationStatistics getDestinationStatistics() {
111 DestinationStatistics destinationStatistics =
112 new DestinationStatistics();
113
114 destinationStatistics.setActiveThreadCount(
115 _threadPoolExecutor.getActiveCount());
116 destinationStatistics.setCurrentThreadCount(
117 _threadPoolExecutor.getPoolSize());
118 destinationStatistics.setLargestThreadCount(
119 _threadPoolExecutor.getLargestPoolSize());
120 destinationStatistics.setMaxThreadPoolSize(
121 _threadPoolExecutor.getMaximumPoolSize());
122 destinationStatistics.setMinThreadPoolSize(
123 _threadPoolExecutor.getCorePoolSize());
124 destinationStatistics.setPendingMessageCount(
125 _threadPoolExecutor.getQueue().size());
126 destinationStatistics.setSentMessageCount(
127 _threadPoolExecutor.getCompletedTaskCount());
128
129 return destinationStatistics;
130 }
131
132 public int getMaximumQueueSize() {
133 return _maximumQueueSize;
134 }
135
136 public int getMessageListenerCount() {
137 return _messageListeners.size();
138 }
139
140 public String getName() {
141 return _name;
142 }
143
144 public int getWorkersCoreSize() {
145 return _workersCoreSize;
146 }
147
148 public int getWorkersMaxSize() {
149 return _workersMaxSize;
150 }
151
152 public boolean isRegistered() {
153 if (getMessageListenerCount() > 0) {
154 return true;
155 }
156 else {
157 return false;
158 }
159 }
160
161 public synchronized void open() {
162 doOpen();
163 }
164
165 public boolean register(MessageListener messageListener) {
166 InvokerMessageListener invokerMessageListener =
167 new InvokerMessageListener(messageListener);
168
169 return registerMessageListener(invokerMessageListener);
170 }
171
172 public boolean register(
173 MessageListener messageListener, ClassLoader classloader) {
174
175 InvokerMessageListener invokerMessageListener =
176 new InvokerMessageListener(messageListener, classloader);
177
178 return registerMessageListener(invokerMessageListener);
179 }
180
181 public void removeDestinationEventListener(
182 DestinationEventListener destinationEventListener) {
183
184 _destinationEventListeners.remove(destinationEventListener);
185 }
186
187 public void removeDestinationEventListeners() {
188 _destinationEventListeners.clear();
189 }
190
191 public void send(Message message) {
192 if (_messageListeners.isEmpty()) {
193 if (_log.isDebugEnabled()) {
194 _log.debug("No message listeners for destination " + getName());
195 }
196
197 return;
198 }
199
200 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
201
202 if (threadPoolExecutor.isShutdown()) {
203 throw new IllegalStateException(
204 "Destination " + getName() + " is shutdown and cannot " +
205 "receive more messages");
206 }
207
208 if ((_maximumQueueSize > -1) &&
209 (threadPoolExecutor.getQueue().size() > _maximumQueueSize)) {
210
211 throw new IllegalStateException(
212 threadPoolExecutor.getQueue().size() +
213 " messages exceeds the maximum queue size of " +
214 _maximumQueueSize);
215 }
216
217 dispatch(_messageListeners, message);
218 }
219
220 public void setMaximumQueueSize(int maximumQueueSize) {
221 _maximumQueueSize = maximumQueueSize;
222 }
223
224 public void setName(String name) {
225 _name = name;
226 }
227
228 public void setWorkersCoreSize(int workersCoreSize) {
229 _workersCoreSize = workersCoreSize;
230 }
231
232 public void setWorkersMaxSize(int workersMaxSize) {
233 _workersMaxSize = workersMaxSize;
234 }
235
236 public boolean unregister(MessageListener messageListener) {
237 InvokerMessageListener invokerMessageListener =
238 new InvokerMessageListener(messageListener);
239
240 return unregisterMessageListener(invokerMessageListener);
241 }
242
243 public boolean unregister(
244 MessageListener messageListener, ClassLoader classloader) {
245
246 InvokerMessageListener invokerMessageListener =
247 new InvokerMessageListener(messageListener, classloader);
248
249 return unregisterMessageListener(invokerMessageListener);
250 }
251
252 public void unregisterMessageListeners() {
253 for (MessageListener messageListener : _messageListeners) {
254 unregisterMessageListener((InvokerMessageListener)messageListener);
255 }
256 }
257
258 protected abstract void dispatch(
259 Set<MessageListener> messageListeners, Message message);
260
261 protected void doClose(boolean force) {
262 if (!_threadPoolExecutor.isShutdown() &&
263 !_threadPoolExecutor.isTerminating()) {
264
265 if (!force) {
266 _threadPoolExecutor.shutdown();
267 }
268 else {
269 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
270
271 if (_log.isInfoEnabled()) {
272 _log.info(
273 "The following " + pendingTasks.size() + " tasks " +
274 "were not executed due to shutown: " +
275 pendingTasks);
276 }
277 }
278 }
279 }
280
281 protected void doOpen() {
282 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
283 _threadPoolExecutor = new ThreadPoolExecutor(
284 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
285 new LinkedBlockingQueue<Runnable>(),
286 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
287 }
288 }
289
290 protected void fireMessageListenerRegisteredEvent(
291 MessageListener messageListener) {
292
293 for (DestinationEventListener destinationEventListener :
294 _destinationEventListeners) {
295
296 destinationEventListener.messageListenerRegistered(
297 getName(), messageListener);
298 }
299 }
300
301 protected void fireMessageListenerUnregisteredEvent(
302 MessageListener messageListener) {
303
304 for (DestinationEventListener listener : _destinationEventListeners) {
305 listener.messageListenerUnregistered(getName(), messageListener);
306 }
307 }
308
309 protected ThreadPoolExecutor getThreadPoolExecutor() {
310 return _threadPoolExecutor;
311 }
312
313 protected boolean registerMessageListener(
314 InvokerMessageListener invokerMessageListener) {
315
316 boolean registered = _messageListeners.add(invokerMessageListener);
317
318 if (registered) {
319 fireMessageListenerRegisteredEvent(
320 invokerMessageListener.getMessageListener());
321 }
322
323 return registered;
324 }
325
326 protected boolean unregisterMessageListener(
327 InvokerMessageListener invokerMessageListener) {
328
329 boolean unregistered = _messageListeners.remove(invokerMessageListener);
330
331 if (unregistered) {
332 fireMessageListenerUnregisteredEvent(
333 invokerMessageListener.getMessageListener());
334 }
335
336 return unregistered;
337 }
338
339 private static final int _WORKERS_CORE_SIZE = 2;
340
341 private static final int _WORKERS_MAX_SIZE = 5;
342
343 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
344
345 private Set<DestinationEventListener> _destinationEventListeners =
346 new ConcurrentHashSet<DestinationEventListener>();
347 private int _maximumQueueSize = -1;
348 private Set<MessageListener> _messageListeners =
349 new ConcurrentHashSet<MessageListener>();
350 private String _name = StringPool.BLANK;
351 private ThreadPoolExecutor _threadPoolExecutor;
352 private int _workersCoreSize = _WORKERS_CORE_SIZE;
353 private int _workersMaxSize = _WORKERS_MAX_SIZE;
354
355 }