1   /**
2    * Copyright (c) 2000-2009 Liferay, Inc. All rights reserved.
3    *
4    *
5    *
6    *
7    * The contents of this file are subject to the terms of the Liferay Enterprise
8    * Subscription License ("License"). You may not use this file except in
9    * compliance with the License. You can obtain a copy of the License by
10   * contacting Liferay, Inc. See the License for the specific language governing
11   * permissions and limitations under the License, including but not limited to
12   * distribution rights of the Software.
13   *
14   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20   * SOFTWARE.
21   */
22  
23  package com.liferay.portal.kernel.messaging;
24  
25  import com.liferay.portal.kernel.log.Log;
26  import com.liferay.portal.kernel.log.LogFactoryUtil;
27  import com.liferay.portal.kernel.util.ConcurrentHashSet;
28  import com.liferay.portal.kernel.util.NamedThreadFactory;
29  import com.liferay.portal.kernel.util.StringPool;
30  import com.liferay.portal.kernel.util.Validator;
31  
32  import java.util.List;
33  import java.util.Set;
34  import java.util.concurrent.LinkedBlockingQueue;
35  import java.util.concurrent.ThreadPoolExecutor;
36  import java.util.concurrent.TimeUnit;
37  
38  /**
39   * <a href="BaseDestination.java.html"><b><i>View Source</i></b></a>
40   *
41   * @author Michael C. Han
42   */
43  public abstract class BaseDestination implements Destination {
44  
45      public BaseDestination() {
46      }
47  
48      /**
49       * @deprecated
50       */
51      public BaseDestination(String name) {
52          this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
53      }
54  
55      /**
56       * @deprecated
57       */
58      public BaseDestination(
59          String name, int workersCoreSize, int workersMaxSize) {
60  
61          _name = name;
62          _workersCoreSize = workersCoreSize;
63          _workersMaxSize = workersMaxSize;
64  
65          open();
66      }
67  
68      public void addDestinationEventListener(
69          DestinationEventListener destinationEventListener) {
70  
71          _destinationEventListeners.add(destinationEventListener);
72      }
73  
74      public void afterPropertiesSet() {
75          if (Validator.isNull(_name)) {
76              throw new IllegalArgumentException("Name is null");
77          }
78  
79          open();
80      }
81  
82      public synchronized void close() {
83          close(false);
84      }
85  
86      public synchronized void close(boolean force) {
87          doClose(force);
88      }
89  
90      public void copyDestinationEventListeners(Destination destination) {
91          for (DestinationEventListener destinationEventListener :
92                  _destinationEventListeners) {
93  
94              destination.addDestinationEventListener(
95                  destinationEventListener);
96          }
97      }
98  
99      public void copyMessageListeners(Destination destination) {
100         for (MessageListener messageListener : _messageListeners) {
101             InvokerMessageListener invokerMessageListener =
102                 (InvokerMessageListener)messageListener;
103 
104             destination.register(
105                 invokerMessageListener.getMessageListener(),
106                 invokerMessageListener.getClassLoader());
107         }
108     }
109 
110     public DestinationStatistics getDestinationStatistics() {
111         DestinationStatistics destinationStatistics =
112             new DestinationStatistics();
113 
114         destinationStatistics.setActiveThreadCount(
115             _threadPoolExecutor.getActiveCount());
116         destinationStatistics.setCurrentThreadCount(
117             _threadPoolExecutor.getPoolSize());
118         destinationStatistics.setLargestThreadCount(
119             _threadPoolExecutor.getLargestPoolSize());
120         destinationStatistics.setMaxThreadPoolSize(
121             _threadPoolExecutor.getMaximumPoolSize());
122         destinationStatistics.setMinThreadPoolSize(
123             _threadPoolExecutor.getCorePoolSize());
124         destinationStatistics.setPendingMessageCount(
125             _threadPoolExecutor.getQueue().size());
126         destinationStatistics.setSentMessageCount(
127             _threadPoolExecutor.getCompletedTaskCount());
128 
129         return destinationStatistics;
130     }
131 
132     public int getMaximumQueueSize() {
133         return _maximumQueueSize;
134     }
135 
136     public int getMessageListenerCount() {
137         return _messageListeners.size();
138     }
139 
140     public String getName() {
141         return _name;
142     }
143 
144     public int getWorkersCoreSize() {
145         return _workersCoreSize;
146     }
147 
148     public int getWorkersMaxSize() {
149         return _workersMaxSize;
150     }
151 
152     public boolean isRegistered() {
153         if (getMessageListenerCount() > 0) {
154             return true;
155         }
156         else {
157             return false;
158         }
159     }
160 
161     public synchronized void open() {
162         doOpen();
163     }
164 
165     public boolean register(MessageListener messageListener) {
166         InvokerMessageListener invokerMessageListener =
167             new InvokerMessageListener(messageListener);
168 
169         return registerMessageListener(invokerMessageListener);
170     }
171 
172     public boolean register(
173         MessageListener messageListener, ClassLoader classloader) {
174 
175         InvokerMessageListener invokerMessageListener =
176             new InvokerMessageListener(messageListener, classloader);
177 
178         return registerMessageListener(invokerMessageListener);
179     }
180 
181     public void removeDestinationEventListener(
182         DestinationEventListener destinationEventListener) {
183 
184         _destinationEventListeners.remove(destinationEventListener);
185     }
186 
187     public void removeDestinationEventListeners() {
188         _destinationEventListeners.clear();
189     }
190 
191     public void send(Message message) {
192         if (_messageListeners.isEmpty()) {
193             if (_log.isDebugEnabled()) {
194                 _log.debug("No message listeners for destination " + getName());
195             }
196 
197             return;
198         }
199 
200         ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
201 
202         if (threadPoolExecutor.isShutdown()) {
203             throw new IllegalStateException(
204                 "Destination " + getName() + " is shutdown and cannot " +
205                     "receive more messages");
206         }
207 
208         if ((_maximumQueueSize > -1) &&
209             (threadPoolExecutor.getQueue().size() > _maximumQueueSize)) {
210 
211             throw new IllegalStateException(
212                 threadPoolExecutor.getQueue().size() +
213                     " messages exceeds the maximum queue size of " +
214                         _maximumQueueSize);
215         }
216 
217         dispatch(_messageListeners, message);
218     }
219 
220     public void setMaximumQueueSize(int maximumQueueSize) {
221         _maximumQueueSize = maximumQueueSize;
222     }
223 
224     public void setName(String name) {
225         _name = name;
226     }
227 
228     public void setWorkersCoreSize(int workersCoreSize) {
229         _workersCoreSize = workersCoreSize;
230     }
231 
232     public void setWorkersMaxSize(int workersMaxSize) {
233         _workersMaxSize = workersMaxSize;
234     }
235 
236     public boolean unregister(MessageListener messageListener) {
237         InvokerMessageListener invokerMessageListener =
238             new InvokerMessageListener(messageListener);
239 
240         return unregisterMessageListener(invokerMessageListener);
241     }
242 
243     public boolean unregister(
244         MessageListener messageListener, ClassLoader classloader) {
245 
246         InvokerMessageListener invokerMessageListener =
247             new InvokerMessageListener(messageListener, classloader);
248 
249         return unregisterMessageListener(invokerMessageListener);
250     }
251 
252     public void unregisterMessageListeners() {
253         for (MessageListener messageListener : _messageListeners) {
254             unregisterMessageListener((InvokerMessageListener)messageListener);
255         }
256     }
257 
258     protected abstract void dispatch(
259         Set<MessageListener> messageListeners, Message message);
260 
261     protected void doClose(boolean force) {
262         if (!_threadPoolExecutor.isShutdown() &&
263             !_threadPoolExecutor.isTerminating()) {
264 
265             if (!force) {
266                 _threadPoolExecutor.shutdown();
267             }
268             else {
269                 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
270 
271                 if (_log.isInfoEnabled()) {
272                     _log.info(
273                         "The following " + pendingTasks.size() + " tasks " +
274                             "were not executed due to shutown: " +
275                                 pendingTasks);
276                 }
277             }
278         }
279     }
280 
281     protected void doOpen() {
282         if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
283             _threadPoolExecutor = new ThreadPoolExecutor(
284                 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
285                 new LinkedBlockingQueue<Runnable>(),
286                 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
287         }
288     }
289 
290     protected void fireMessageListenerRegisteredEvent(
291         MessageListener messageListener) {
292 
293         for (DestinationEventListener destinationEventListener :
294                 _destinationEventListeners) {
295 
296             destinationEventListener.messageListenerRegistered(
297                 getName(), messageListener);
298         }
299     }
300 
301     protected void fireMessageListenerUnregisteredEvent(
302         MessageListener messageListener) {
303 
304         for (DestinationEventListener listener : _destinationEventListeners) {
305             listener.messageListenerUnregistered(getName(), messageListener);
306         }
307     }
308 
309     protected ThreadPoolExecutor getThreadPoolExecutor() {
310         return _threadPoolExecutor;
311     }
312 
313     protected boolean registerMessageListener(
314         InvokerMessageListener invokerMessageListener) {
315 
316         boolean registered = _messageListeners.add(invokerMessageListener);
317 
318         if (registered) {
319             fireMessageListenerRegisteredEvent(
320                 invokerMessageListener.getMessageListener());
321         }
322 
323         return registered;
324     }
325 
326     protected boolean unregisterMessageListener(
327         InvokerMessageListener invokerMessageListener) {
328 
329         boolean unregistered = _messageListeners.remove(invokerMessageListener);
330 
331         if (unregistered) {
332             fireMessageListenerUnregisteredEvent(
333                 invokerMessageListener.getMessageListener());
334         }
335 
336         return unregistered;
337     }
338 
339     private static final int _WORKERS_CORE_SIZE = 2;
340 
341     private static final int _WORKERS_MAX_SIZE = 5;
342 
343     private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
344 
345     private Set<DestinationEventListener> _destinationEventListeners =
346         new ConcurrentHashSet<DestinationEventListener>();
347     private int _maximumQueueSize = -1;
348     private Set<MessageListener> _messageListeners =
349         new ConcurrentHashSet<MessageListener>();
350     private String _name = StringPool.BLANK;
351     private ThreadPoolExecutor _threadPoolExecutor;
352     private int _workersCoreSize = _WORKERS_CORE_SIZE;
353     private int _workersMaxSize = _WORKERS_MAX_SIZE;
354 
355 }