1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33
34
40 public class DefaultMessageBus implements MessageBus {
41
42 public synchronized void addDestination(Destination destination) {
43 _destinations.put(destination.getName(), destination);
44 fireDestinationAddedEvent(destination);
45 }
46
47 public void addDestinationEventListener(DestinationEventListener listener) {
48 _destinationEventListeners.add(listener);
49 }
50
51 public void destroy() {
52 shutdown(true);
53 }
54
55 public int getDestinationCount() {
56 return _destinations.size();
57 }
58
59 public Collection<String> getDestinationNames() {
60 return _destinations.keySet();
61 }
62
63 public Collection<Destination> getDestinations() {
64 return _destinations.values();
65 }
66
67 public boolean hasDestination(String destinationName) {
68 return _destinations.containsKey(destinationName);
69 }
70
71 public boolean hasMessageListener(String destination) {
72 Destination destinationModel = _destinations.get(destination);
73
74 if ((destinationModel != null) && destinationModel.isRegistered()) {
75 return true;
76 }
77 else {
78 return false;
79 }
80 }
81
82 public synchronized void registerMessageListener(
83 String destination, MessageListener listener) {
84
85 Destination destinationModel = _destinations.get(destination);
86
87 if (destinationModel == null) {
88 throw new IllegalStateException(
89 "Destination " + destination + " is not configured");
90 }
91
92 destinationModel.register(listener);
93 }
94
95 public synchronized void removeDestination(String destination) {
96 Destination destinationModel = _destinations.remove(destination);
97
98 fireDestinationRemovedEvent(destinationModel);
99 }
100
101 public void removeDestinationEventListener(
102 DestinationEventListener listener) {
103
104 _destinationEventListeners.remove(listener);
105 }
106
107 public void sendMessage(String destination, Message message) {
108 Destination destinationModel = _destinations.get(destination);
109
110 if (destinationModel == null) {
111 if (_log.isWarnEnabled()) {
112 _log.warn("Destination " + destination + " is not configured");
113 }
114
115 return;
116 }
117
118 message.setDestination(destination);
119
120 destinationModel.send(message);
121 }
122
123 public void shutdown() {
124 shutdown(false);
125 }
126
127 public synchronized void shutdown(boolean force) {
128 for (Destination destination : _destinations.values()) {
129 destination.close(force);
130 }
131 }
132
133 public synchronized boolean unregisterMessageListener(
134 String destination, MessageListener listener) {
135
136 Destination destinationModel = _destinations.get(destination);
137
138 if (destinationModel == null) {
139 return false;
140 }
141
142 return destinationModel.unregister(listener);
143 }
144
145 protected void fireDestinationAddedEvent(Destination destination) {
146 for (DestinationEventListener listener : _destinationEventListeners) {
147 listener.destinationAdded(destination);
148 }
149 }
150
151 protected void fireDestinationRemovedEvent(Destination destination) {
152 for (DestinationEventListener listener : _destinationEventListeners) {
153 listener.destinationRemoved(destination);
154 }
155 }
156
157 private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
158
159 private List<DestinationEventListener> _destinationEventListeners =
160 new ArrayList<DestinationEventListener>();
161 private Map<String, Destination> _destinations =
162 new HashMap<String, Destination>();
163
164 }