1
19
20 package com.liferay.portal.kernel.messaging;
21
22 import com.liferay.portal.kernel.log.Log;
23 import com.liferay.portal.kernel.log.LogFactoryUtil;
24 import com.liferay.portal.kernel.util.NamedThreadFactory;
25
26 import java.util.List;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30
31
37 public abstract class BaseDestination implements Destination {
38
39 public BaseDestination(String name) {
40 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
41 }
42
43 public BaseDestination(
44 String name, int workersCoreSize, int workersMaxSize) {
45
46 _name = name;
47 _workersCoreSize = workersCoreSize;
48 _workersMaxSize = workersMaxSize;
49
50 open();
51 }
52
53 public synchronized void close() {
54 close(false);
55 }
56
57 public synchronized void close(boolean force) {
58 doClose(force);
59 }
60
61 public DestinationStatistics getStatistics() {
62 DestinationStatistics statistics = new DestinationStatistics();
63
64 statistics.setActiveThreadCount(_threadPoolExecutor.getActiveCount());
65 statistics.setCurrentThreadCount(_threadPoolExecutor.getPoolSize());
66 statistics.setLargestThreadCount(
67 _threadPoolExecutor.getLargestPoolSize());
68 statistics.setMaxThreadPoolSize(
69 _threadPoolExecutor.getMaximumPoolSize());
70 statistics.setMinThreadPoolSize(_threadPoolExecutor.getCorePoolSize());
71 statistics.setPendingMessageCount(_threadPoolExecutor.getTaskCount());
72 statistics.setSentMessageCount(
73 _threadPoolExecutor.getCompletedTaskCount());
74
75 return statistics;
76 }
77
78 public int getListenerCount() {
79 return _listenerCount;
80 }
81
82 public String getName() {
83 return _name;
84 }
85
86 public boolean isRegistered() {
87 if (_listenerCount > 0) {
88 return true;
89 }
90 else {
91 return false;
92 }
93 }
94
95 public synchronized void open() {
96 doOpen();
97 }
98
99 protected void doClose(boolean force) {
100 if (!_threadPoolExecutor.isShutdown() &&
101 !_threadPoolExecutor.isTerminating()) {
102
103 if (!force) {
104 _threadPoolExecutor.shutdown();
105 }
106 else {
107 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
108
109 if (_log.isInfoEnabled()) {
110 _log.info(
111 "The following " + pendingTasks.size() + " tasks " +
112 "were not executed due to shutown: " +
113 pendingTasks);
114 }
115 }
116 }
117 }
118
119 protected void doOpen() {
120 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
121 _threadPoolExecutor = new ThreadPoolExecutor(
122 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
123 new LinkedBlockingQueue<Runnable>(),
124 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
125 }
126 }
127
128 protected ThreadPoolExecutor getThreadPoolExecutor() {
129 return _threadPoolExecutor;
130 }
131
132 protected int getWorkersCoreSize() {
133 return _workersCoreSize;
134 }
135
136 protected int getWorkersMaxSize() {
137 return _workersMaxSize;
138 }
139
140 protected void setListenerCount(int listenerCount) {
141 _listenerCount = listenerCount;
142 }
143
144 private static final int _WORKERS_CORE_SIZE = 5;
145
146 private static final int _WORKERS_MAX_SIZE = 10;
147
148 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
149
150 private String _name;
151 private ThreadPoolExecutor _threadPoolExecutor;
152 private int _workersCoreSize;
153 private int _workersMaxSize;
154 private int _listenerCount;
155
156 }