1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.ConcurrentHashSet;
28
29 import java.util.Collection;
30 import java.util.HashMap;
31 import java.util.Map;
32 import java.util.Set;
33
34
39 public class DefaultMessageBus implements MessageBus {
40
41 public synchronized void addDestination(Destination destination) {
42 _destinations.put(destination.getName(), destination);
43
44 fireDestinationAddedEvent(destination);
45 }
46
47 public void addDestinationEventListener(
48 DestinationEventListener destinationEventListener) {
49
50 _destinationEventListeners.add(destinationEventListener);
51 }
52
53 public void addDestinationEventListener(
54 String destinationName,
55 DestinationEventListener destinationEventListener) {
56
57 Destination destination = _destinations.get(destinationName);
58
59 if (destination != null) {
60 destination.addDestinationEventListener(destinationEventListener);
61 }
62 }
63
64 public void destroy() {
65 shutdown(true);
66 }
67
68 public Destination getDestination(String destinationName) {
69 return _destinations.get(destinationName);
70 }
71
72 public int getDestinationCount() {
73 return _destinations.size();
74 }
75
76 public Collection<String> getDestinationNames() {
77 return _destinations.keySet();
78 }
79
80 public Collection<Destination> getDestinations() {
81 return _destinations.values();
82 }
83
84 public boolean hasDestination(String destinationName) {
85 return _destinations.containsKey(destinationName);
86 }
87
88 public boolean hasMessageListener(String destinationName) {
89 Destination destination = _destinations.get(destinationName);
90
91 if ((destination != null) && destination.isRegistered()) {
92 return true;
93 }
94 else {
95 return false;
96 }
97 }
98
99 public synchronized boolean registerMessageListener(
100 String destinationName, MessageListener messageListener) {
101
102 Destination destination = _destinations.get(destinationName);
103
104 if (destination == null) {
105 throw new IllegalStateException(
106 "Destination " + destinationName + " is not configured");
107 }
108
109 boolean registered = destination.register(messageListener);
110
111 if (registered) {
112 fireMessageListenerRegisteredEvent(destination, messageListener);
113 }
114
115 return registered;
116 }
117
118 public synchronized Destination removeDestination(String destinationName) {
119 Destination destinationModel = _destinations.remove(destinationName);
120
121 destinationModel.removeDestinationEventListeners();
122 destinationModel.unregisterMessageListeners();
123
124 fireDestinationRemovedEvent(destinationModel);
125
126 return destinationModel;
127 }
128
129 public void removeDestinationEventListener(
130 DestinationEventListener destinationEventListener) {
131
132 _destinationEventListeners.remove(destinationEventListener);
133 }
134
135 public void removeDestinationEventListener(
136 String destinationName,
137 DestinationEventListener destinationEventListener) {
138
139 Destination destination = _destinations.get(destinationName);
140
141 if (destination != null) {
142 destination.removeDestinationEventListener(
143 destinationEventListener);
144 }
145 }
146
147 public void replace(Destination destination) {
148 Destination oldDestination = _destinations.get(destination.getName());
149
150 oldDestination.copyDestinationEventListeners(destination);
151 oldDestination.copyMessageListeners(destination);
152
153 removeDestination(oldDestination.getName());
154
155 addDestination(destination);
156 }
157
158 public void sendMessage(String destinationName, Message message) {
159 Destination destination = _destinations.get(destinationName);
160
161 if (destination == null) {
162 if (_log.isWarnEnabled()) {
163 _log.warn(
164 "Destination " + destinationName + " is not configured");
165 }
166
167 return;
168 }
169
170 message.setDestinationName(destinationName);
171
172 destination.send(message);
173 }
174
175 public void shutdown() {
176 shutdown(false);
177 }
178
179 public synchronized void shutdown(boolean force) {
180 for (Destination destination : _destinations.values()) {
181 destination.close(force);
182 }
183 }
184
185 public synchronized boolean unregisterMessageListener(
186 String destinationName, MessageListener messageListener) {
187
188 Destination destination = _destinations.get(destinationName);
189
190 if (destination == null) {
191 return false;
192 }
193
194 boolean unregistered = destination.unregister(messageListener);
195
196 if (unregistered) {
197 fireMessageListenerUnregisteredEvent(destination, messageListener);
198 }
199
200 return unregistered;
201 }
202
203 protected void fireDestinationAddedEvent(Destination destination) {
204 for (DestinationEventListener listener : _destinationEventListeners) {
205 listener.destinationAdded(destination);
206 }
207 }
208
209 protected void fireDestinationRemovedEvent(Destination destination) {
210 for (DestinationEventListener listener : _destinationEventListeners) {
211 listener.destinationRemoved(destination);
212 }
213 }
214
215 protected void fireMessageListenerRegisteredEvent(
216 Destination destination, MessageListener messageListener) {
217
218 for (DestinationEventListener destinationEventListener :
219 _destinationEventListeners) {
220
221 destinationEventListener.messageListenerRegistered(
222 destination.getName(), messageListener);
223 }
224 }
225
226 protected void fireMessageListenerUnregisteredEvent(
227 Destination destination, MessageListener messageListener) {
228
229 for (DestinationEventListener destinationEventListener :
230 _destinationEventListeners) {
231
232 destinationEventListener.messageListenerUnregistered(
233 destination.getName(), messageListener);
234 }
235 }
236
237 private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
238
239 private Set<DestinationEventListener> _destinationEventListeners =
240 new ConcurrentHashSet<DestinationEventListener>();
241 private Map<String, Destination> _destinations =
242 new HashMap<String, Destination>();
243
244 }