1
19
20 package com.liferay.portal.kernel.messaging;
21
22 import com.liferay.portal.kernel.log.Log;
23 import com.liferay.portal.kernel.log.LogFactoryUtil;
24 import com.liferay.portal.kernel.util.ListUtil;
25
26 import java.util.Arrays;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Set;
30 import java.util.concurrent.ThreadPoolExecutor;
31
32
39 public abstract class ArrayDispatcherDestination extends BaseDestination {
40
41 public ArrayDispatcherDestination(String name) {
42 super(name);
43 }
44
45 public ArrayDispatcherDestination(
46 String name, int workersCoreSize, int workersMaxSize) {
47
48 super(name, workersCoreSize, workersMaxSize);
49 }
50
51 public synchronized void register(MessageListener listener) {
52 listener = new InvokerMessageListener(listener);
53
54 Set<MessageListener> listeners = new HashSet<MessageListener>(
55 Arrays.asList(_listeners));
56
57 listeners.add(listener);
58
59 _listeners = listeners.toArray(
60 new MessageListener[listeners.size()]);
61
62 setListenerCount(listeners.size());
63 }
64
65 public void send(Message message) {
66 if (_listeners.length == 0) {
67 if (_log.isDebugEnabled()) {
68 _log.debug("No listeners for destination " + getName());
69 }
70
71 return;
72 }
73
74 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
75
76 if (threadPoolExecutor.isShutdown()) {
77 throw new IllegalStateException(
78 "Destination " + getName() + " is shutdown and cannot " +
79 "receive more messages");
80 }
81
82 dispatch(_listeners, message);
83 }
84
85 public synchronized boolean unregister(MessageListener listener) {
86 listener = new InvokerMessageListener(listener);
87
88 List<MessageListener> listeners = ListUtil.fromArray(_listeners);
89
90 boolean value = listeners.remove(listener);
91
92 if (value) {
93 _listeners = listeners.toArray(
94 new MessageListener[listeners.size()]);
95
96 setListenerCount(listeners.size());
97 }
98
99 return value;
100 }
101
102 protected abstract void dispatch(
103 MessageListener[] listeners, Message message);
104
105 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
106
107 private MessageListener[] _listeners = new MessageListener[0];
108
109 }