1   /**
2    * Copyright (c) 2000-2009 Liferay, Inc. All rights reserved.
3    *
4    * The contents of this file are subject to the terms of the Liferay Enterprise
5    * Subscription License ("License"). You may not use this file except in
6    * compliance with the License. You can obtain a copy of the License by
7    * contacting Liferay, Inc. See the License for the specific language governing
8    * permissions and limitations under the License, including but not limited to
9    * distribution rights of the Software.
10   *
11   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
12   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
13   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
14   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
15   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
16   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
17   * SOFTWARE.
18   */
19  
20  package com.liferay.portal.kernel.messaging;
21  
22  import com.liferay.portal.kernel.log.Log;
23  import com.liferay.portal.kernel.log.LogFactoryUtil;
24  import com.liferay.portal.kernel.util.ListUtil;
25  
26  import java.util.Arrays;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Set;
30  import java.util.concurrent.ThreadPoolExecutor;
31  
32  /**
33   * <a href="ArrayDispatcherDestination.java.html"><b><i>View Source</i></b></a>
34   *
35   * @author Michael C. Han
36   * @author Brian Wing Shun Chan
37   *
38   */
39  public abstract class ArrayDispatcherDestination extends BaseDestination {
40  
41      public ArrayDispatcherDestination(String name) {
42          super(name);
43      }
44  
45      public ArrayDispatcherDestination(
46          String name, int workersCoreSize, int workersMaxSize) {
47  
48          super(name, workersCoreSize, workersMaxSize);
49      }
50  
51      public synchronized void register(MessageListener listener) {
52          listener = new InvokerMessageListener(listener);
53  
54          Set<MessageListener> listeners = new HashSet<MessageListener>(
55              Arrays.asList(_listeners));
56  
57          listeners.add(listener);
58  
59          _listeners = listeners.toArray(
60              new MessageListener[listeners.size()]);
61  
62          setListenerCount(listeners.size());
63      }
64  
65      public void send(Message message) {
66          if (_listeners.length == 0) {
67              if (_log.isDebugEnabled()) {
68                  _log.debug("No listeners for destination " + getName());
69              }
70  
71              return;
72          }
73  
74          ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
75  
76          if (threadPoolExecutor.isShutdown()) {
77              throw new IllegalStateException(
78                  "Destination " + getName() + " is shutdown and cannot " +
79                      "receive more messages");
80          }
81  
82          dispatch(_listeners, message);
83      }
84  
85      public synchronized boolean unregister(MessageListener listener) {
86          listener = new InvokerMessageListener(listener);
87  
88          List<MessageListener> listeners = ListUtil.fromArray(_listeners);
89  
90          boolean value = listeners.remove(listener);
91  
92          if (value) {
93              _listeners = listeners.toArray(
94                  new MessageListener[listeners.size()]);
95  
96              setListenerCount(listeners.size());
97          }
98  
99          return value;
100     }
101 
102     protected abstract void dispatch(
103         MessageListener[] listeners, Message message);
104 
105     private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
106 
107     private MessageListener[] _listeners = new MessageListener[0];
108 
109 }