1   /**
2    * Copyright (c) 2000-2009 Liferay, Inc. All rights reserved.
3    *
4    * Permission is hereby granted, free of charge, to any person obtaining a copy
5    * of this software and associated documentation files (the "Software"), to deal
6    * in the Software without restriction, including without limitation the rights
7    * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8    * copies of the Software, and to permit persons to whom the Software is
9    * furnished to do so, subject to the following conditions:
10   *
11   * The above copyright notice and this permission notice shall be included in
12   * all copies or substantial portions of the Software.
13   *
14   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20   * SOFTWARE.
21   */
22  
23  package com.liferay.portal.kernel.messaging;
24  
25  import com.liferay.portal.kernel.log.Log;
26  import com.liferay.portal.kernel.log.LogFactoryUtil;
27  import com.liferay.portal.kernel.util.ConcurrentHashSet;
28  import com.liferay.portal.kernel.util.NamedThreadFactory;
29  
30  import java.util.List;
31  import java.util.Set;
32  import java.util.concurrent.LinkedBlockingQueue;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  /**
37   * <a href="BaseDestination.java.html"><b><i>View Source</i></b></a>
38   *
39   * @author Michael C. Han
40   *
41   */
42  public abstract class BaseDestination implements Destination {
43  
44      public BaseDestination(String name) {
45          this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
46      }
47  
48      public BaseDestination(
49          String name, int workersCoreSize, int workersMaxSize) {
50  
51          _name = name;
52          _workersCoreSize = workersCoreSize;
53          _workersMaxSize = workersMaxSize;
54  
55          open();
56      }
57  
58      public synchronized void close() {
59          close(false);
60      }
61  
62      public synchronized void close(boolean force) {
63          doClose(force);
64      }
65  
66      public void copyMessageListeners(Destination destination) {
67          for (MessageListener messageListener : _messageListeners) {
68              InvokerMessageListener invokerMessageListener =
69                  (InvokerMessageListener)messageListener;
70  
71              destination.register(
72                  invokerMessageListener.getMessageListener(),
73                  invokerMessageListener.getClassLoader());
74          }
75      }
76  
77      public DestinationStatistics getDestinationStatistics() {
78          DestinationStatistics destinationStatistics =
79              new DestinationStatistics();
80  
81          destinationStatistics.setActiveThreadCount(
82              _threadPoolExecutor.getActiveCount());
83          destinationStatistics.setCurrentThreadCount(
84              _threadPoolExecutor.getPoolSize());
85          destinationStatistics.setLargestThreadCount(
86              _threadPoolExecutor.getLargestPoolSize());
87          destinationStatistics.setMaxThreadPoolSize(
88              _threadPoolExecutor.getMaximumPoolSize());
89          destinationStatistics.setMinThreadPoolSize(
90              _threadPoolExecutor.getCorePoolSize());
91          destinationStatistics.setPendingMessageCount(
92              _threadPoolExecutor.getQueue().size());
93          destinationStatistics.setSentMessageCount(
94              _threadPoolExecutor.getCompletedTaskCount());
95  
96          return destinationStatistics;
97      }
98  
99      public int getMessageListenerCount() {
100         return _messageListeners.size();
101     }
102 
103     public String getName() {
104         return _name;
105     }
106 
107     public boolean isRegistered() {
108         if (getMessageListenerCount() > 0) {
109             return true;
110         }
111         else {
112             return false;
113         }
114     }
115 
116     public synchronized void open() {
117         doOpen();
118     }
119 
120     public void register(MessageListener messageListener) {
121         InvokerMessageListener invokerMessageListener =
122             new InvokerMessageListener(messageListener);
123 
124         _messageListeners.add(invokerMessageListener);
125     }
126 
127     public void register(
128         MessageListener messageListener, ClassLoader classloader) {
129 
130         InvokerMessageListener invokerMessageListener =
131             new InvokerMessageListener(messageListener, classloader);
132 
133         _messageListeners.add(invokerMessageListener);
134     }
135 
136     public void send(Message message) {
137         if (_messageListeners.isEmpty()) {
138             if (_log.isDebugEnabled()) {
139                 _log.debug("No message listeners for destination " + getName());
140             }
141 
142             return;
143         }
144 
145         ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
146 
147         if (threadPoolExecutor.isShutdown()) {
148             throw new IllegalStateException(
149                 "Destination " + getName() + " is shutdown and cannot " +
150                     "receive more messages");
151         }
152 
153         dispatch(_messageListeners, message);
154     }
155 
156     public boolean unregister(MessageListener messageListener) {
157         InvokerMessageListener invokerMessageListener =
158             new InvokerMessageListener(messageListener);
159 
160         return _messageListeners.remove(invokerMessageListener);
161     }
162 
163     public boolean unregister(
164         MessageListener messageListener, ClassLoader classloader) {
165 
166         InvokerMessageListener invokerMessageListener =
167             new InvokerMessageListener(messageListener, classloader);
168 
169         return _messageListeners.remove(invokerMessageListener);
170     }
171 
172     protected abstract void dispatch(
173         Set<MessageListener> messageListeners, Message message);
174 
175     protected void doClose(boolean force) {
176         if (!_threadPoolExecutor.isShutdown() &&
177             !_threadPoolExecutor.isTerminating()) {
178 
179             if (!force) {
180                 _threadPoolExecutor.shutdown();
181             }
182             else {
183                 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
184 
185                 if (_log.isInfoEnabled()) {
186                     _log.info(
187                         "The following " + pendingTasks.size() + " tasks " +
188                             "were not executed due to shutown: " +
189                                 pendingTasks);
190                 }
191             }
192         }
193     }
194 
195     protected void doOpen() {
196         if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
197             _threadPoolExecutor = new ThreadPoolExecutor(
198                 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
199                 new LinkedBlockingQueue<Runnable>(),
200                 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
201         }
202     }
203 
204     protected ThreadPoolExecutor getThreadPoolExecutor() {
205         return _threadPoolExecutor;
206     }
207 
208     protected int getWorkersCoreSize() {
209         return _workersCoreSize;
210     }
211 
212     protected int getWorkersMaxSize() {
213         return _workersMaxSize;
214     }
215 
216     private static final int _WORKERS_CORE_SIZE = 2;
217 
218     private static final int _WORKERS_MAX_SIZE = 5;
219 
220     private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
221 
222     private Set<MessageListener> _messageListeners =
223         new ConcurrentHashSet<MessageListener>();
224     private String _name;
225     private ThreadPoolExecutor _threadPoolExecutor;
226     private int _workersCoreSize;
227     private int _workersMaxSize;
228 
229 }