1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.ConcurrentHashSet;
28 import com.liferay.portal.kernel.util.NamedThreadFactory;
29
30 import java.util.List;
31 import java.util.Set;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35
36
42 public abstract class BaseDestination implements Destination {
43
44 public BaseDestination(String name) {
45 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
46 }
47
48 public BaseDestination(
49 String name, int workersCoreSize, int workersMaxSize) {
50
51 _name = name;
52 _workersCoreSize = workersCoreSize;
53 _workersMaxSize = workersMaxSize;
54
55 open();
56 }
57
58 public synchronized void close() {
59 close(false);
60 }
61
62 public synchronized void close(boolean force) {
63 doClose(force);
64 }
65
66 public void copyMessageListeners(Destination destination) {
67 for (MessageListener messageListener : _messageListeners) {
68 InvokerMessageListener invokerMessageListener =
69 (InvokerMessageListener)messageListener;
70
71 destination.register(
72 invokerMessageListener.getMessageListener(),
73 invokerMessageListener.getClassLoader());
74 }
75 }
76
77 public DestinationStatistics getDestinationStatistics() {
78 DestinationStatistics destinationStatistics =
79 new DestinationStatistics();
80
81 destinationStatistics.setActiveThreadCount(
82 _threadPoolExecutor.getActiveCount());
83 destinationStatistics.setCurrentThreadCount(
84 _threadPoolExecutor.getPoolSize());
85 destinationStatistics.setLargestThreadCount(
86 _threadPoolExecutor.getLargestPoolSize());
87 destinationStatistics.setMaxThreadPoolSize(
88 _threadPoolExecutor.getMaximumPoolSize());
89 destinationStatistics.setMinThreadPoolSize(
90 _threadPoolExecutor.getCorePoolSize());
91 destinationStatistics.setPendingMessageCount(
92 _threadPoolExecutor.getQueue().size());
93 destinationStatistics.setSentMessageCount(
94 _threadPoolExecutor.getCompletedTaskCount());
95
96 return destinationStatistics;
97 }
98
99 public int getMessageListenerCount() {
100 return _messageListeners.size();
101 }
102
103 public String getName() {
104 return _name;
105 }
106
107 public boolean isRegistered() {
108 if (getMessageListenerCount() > 0) {
109 return true;
110 }
111 else {
112 return false;
113 }
114 }
115
116 public synchronized void open() {
117 doOpen();
118 }
119
120 public void register(MessageListener messageListener) {
121 InvokerMessageListener invokerMessageListener =
122 new InvokerMessageListener(messageListener);
123
124 _messageListeners.add(invokerMessageListener);
125 }
126
127 public void register(
128 MessageListener messageListener, ClassLoader classloader) {
129
130 InvokerMessageListener invokerMessageListener =
131 new InvokerMessageListener(messageListener, classloader);
132
133 _messageListeners.add(invokerMessageListener);
134 }
135
136 public void send(Message message) {
137 if (_messageListeners.isEmpty()) {
138 if (_log.isDebugEnabled()) {
139 _log.debug("No message listeners for destination " + getName());
140 }
141
142 return;
143 }
144
145 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
146
147 if (threadPoolExecutor.isShutdown()) {
148 throw new IllegalStateException(
149 "Destination " + getName() + " is shutdown and cannot " +
150 "receive more messages");
151 }
152
153 dispatch(_messageListeners, message);
154 }
155
156 public boolean unregister(MessageListener messageListener) {
157 InvokerMessageListener invokerMessageListener =
158 new InvokerMessageListener(messageListener);
159
160 return _messageListeners.remove(invokerMessageListener);
161 }
162
163 public boolean unregister(
164 MessageListener messageListener, ClassLoader classloader) {
165
166 InvokerMessageListener invokerMessageListener =
167 new InvokerMessageListener(messageListener, classloader);
168
169 return _messageListeners.remove(invokerMessageListener);
170 }
171
172 protected abstract void dispatch(
173 Set<MessageListener> messageListeners, Message message);
174
175 protected void doClose(boolean force) {
176 if (!_threadPoolExecutor.isShutdown() &&
177 !_threadPoolExecutor.isTerminating()) {
178
179 if (!force) {
180 _threadPoolExecutor.shutdown();
181 }
182 else {
183 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
184
185 if (_log.isInfoEnabled()) {
186 _log.info(
187 "The following " + pendingTasks.size() + " tasks " +
188 "were not executed due to shutown: " +
189 pendingTasks);
190 }
191 }
192 }
193 }
194
195 protected void doOpen() {
196 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
197 _threadPoolExecutor = new ThreadPoolExecutor(
198 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
199 new LinkedBlockingQueue<Runnable>(),
200 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
201 }
202 }
203
204 protected ThreadPoolExecutor getThreadPoolExecutor() {
205 return _threadPoolExecutor;
206 }
207
208 protected int getWorkersCoreSize() {
209 return _workersCoreSize;
210 }
211
212 protected int getWorkersMaxSize() {
213 return _workersMaxSize;
214 }
215
216 private static final int _WORKERS_CORE_SIZE = 2;
217
218 private static final int _WORKERS_MAX_SIZE = 5;
219
220 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
221
222 private Set<MessageListener> _messageListeners =
223 new ConcurrentHashSet<MessageListener>();
224 private String _name;
225 private ThreadPoolExecutor _threadPoolExecutor;
226 private int _workersCoreSize;
227 private int _workersMaxSize;
228
229 }