1
22
23 package com.liferay.portal.cluster;
24
25 import com.liferay.portal.kernel.cluster.Address;
26 import com.liferay.portal.kernel.cluster.ClusterLink;
27 import com.liferay.portal.kernel.cluster.Priority;
28 import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
29 import com.liferay.portal.kernel.log.Log;
30 import com.liferay.portal.kernel.log.LogFactoryUtil;
31 import com.liferay.portal.kernel.messaging.Message;
32 import com.liferay.portal.kernel.util.GetterUtil;
33 import com.liferay.portal.kernel.util.IPDetector;
34 import com.liferay.portal.kernel.util.OSDetector;
35 import com.liferay.portal.kernel.util.PropsKeys;
36 import com.liferay.portal.kernel.util.SocketUtil;
37 import com.liferay.portal.kernel.util.StringPool;
38 import com.liferay.portal.kernel.util.Validator;
39 import com.liferay.portal.util.PropsUtil;
40 import com.liferay.portal.util.PropsValues;
41
42 import java.io.IOException;
43
44 import java.util.ArrayList;
45 import java.util.Collections;
46 import java.util.List;
47 import java.util.Properties;
48 import java.util.Vector;
49
50 import org.jgroups.ChannelException;
51 import org.jgroups.JChannel;
52 import org.jgroups.ReceiverAdapter;
53 import org.jgroups.View;
54
55
60 public class ClusterLinkImpl implements ClusterLink {
61
62 public void afterPropertiesSet() {
63 if (!PropsValues.CLUSTER_LINK_ENABLED) {
64 return;
65 }
66
67 if (OSDetector.isUnix() && IPDetector.isSupportsV6() &&
68 !IPDetector.isPrefersV4() && _log.isWarnEnabled()) {
69
70 StringBuilder sb = new StringBuilder();
71
72 sb.append("You are on an Unix server with IPv6 enabled. JGroups ");
73 sb.append("may not work with IPv6. If you see a multicast ");
74 sb.append("error, try adding java.net.preferIPv4Stack=true ");
75 sb.append("as a JVM startup parameter.");
76
77 _log.warn(sb.toString());
78 }
79
80 initSystemProperties();
81
82 try {
83 initBindAddress();
84 }
85 catch (IOException ioe) {
86 if (_log.isWarnEnabled()) {
87 _log.warn("Failed to initialize outgoing IP address", ioe);
88 }
89 }
90
91 try {
92 initChannels();
93 }
94 catch (Exception e) {
95 _log.error(e, e);
96 }
97 }
98
99 public void destory() {
100 if (!PropsValues.CLUSTER_LINK_ENABLED) {
101 return;
102 }
103
104 for (JChannel channel : _channels) {
105 channel.close();
106 }
107 }
108
109 public List<Address> getAddresses() {
110 if (!PropsValues.CLUSTER_LINK_ENABLED) {
111 return Collections.EMPTY_LIST;
112 }
113
114 Vector<org.jgroups.Address> jGroupsAddresses =
115 _channels.get(0).getView().getMembers();
116
117 if (jGroupsAddresses == null) {
118 return new ArrayList<Address>();
119 }
120
121 List<Address> addresses = new ArrayList<Address>(
122 jGroupsAddresses.size());
123
124 for (org.jgroups.Address address : jGroupsAddresses) {
125 addresses.add(new AddressImpl(address));
126 }
127
128 return addresses;
129 }
130
131 public boolean isEnabled() {
132 return PropsValues.CLUSTER_LINK_ENABLED;
133 }
134
135 public void sendMulticastMessage(Message message, Priority priority) {
136 if (!PropsValues.CLUSTER_LINK_ENABLED) {
137 return;
138 }
139
140 JChannel channel = getChannel(priority);
141
142 try {
143 channel.send(null, null, message);
144 }
145 catch (ChannelException ce) {
146 _log.error("Unable to send multicast message " + message, ce);
147 }
148 }
149
150 public void sendUnicastMessage(
151 Address address, Message message, Priority priority) {
152
153 if (!PropsValues.CLUSTER_LINK_ENABLED) {
154 return;
155 }
156
157 org.jgroups.Address jGroupsAddress =
158 (org.jgroups.Address)address.getRealAddress();
159
160 JChannel channel = getChannel(priority);
161
162 try {
163 channel.send(jGroupsAddress, null, message);
164 }
165 catch (ChannelException ce) {
166 _log.error("Unable to send unicast message:" + message, ce);
167 }
168 }
169
170 public void setClusterForwardMessageListener(
171 ClusterForwardMessageListener clusterForwardMessageListener) {
172
173 _clusterForwardMessageListener = clusterForwardMessageListener;
174 }
175
176 protected JChannel createChannel(int index, String properties)
177 throws ChannelException {
178
179 JChannel channel = new JChannel(properties);
180
181 channel.setReceiver(
182 new ReceiverAdapter() {
183
184 public void receive(org.jgroups.Message message) {
185 if ((!_addresses.contains(message.getSrc())) ||
186 (message.getDest() != null)) {
187
188 _clusterForwardMessageListener.receive(
189 (Message)message.getObject());
190 }
191 else {
192 if (_log.isDebugEnabled()) {
193 _log.debug("Block received message " + message);
194 }
195 }
196 }
197
198 public void viewAccepted(View view) {
199 if (_log.isDebugEnabled()) {
200 _log.debug("Cluster link accepted view " + view);
201 }
202 }
203
204 }
205 );
206
207 channel.connect(_LIFERAY_CHANNEL + index);
208
209 if (_log.isInfoEnabled()) {
210 _log.info(
211 "Create a new channel with properties " +
212 channel.getProperties());
213 }
214
215 return channel;
216 }
217
218 protected JChannel getChannel(Priority priority) {
219 int channelIndex =
220 priority.ordinal() * _channelCount / _MAX_CHANNEL_COUNT;
221
222 if (_log.isDebugEnabled()) {
223 _log.debug(
224 "Select channel number " + channelIndex + " for priority " +
225 priority);
226 }
227
228 return _channels.get(channelIndex);
229 }
230
231 protected void initBindAddress() throws IOException {
232 String autodetectAddress = PropsValues.CLUSTER_LINK_AUTODETECT_ADDRESS;
233
234 if (Validator.isNull(autodetectAddress)) {
235 return;
236 }
237
238 String host = autodetectAddress;
239 int port = 80;
240
241 int index = autodetectAddress.indexOf(StringPool.COLON);
242
243 if (index != -1) {
244 host = autodetectAddress.substring(0, index);
245 port = GetterUtil.getInteger(
246 autodetectAddress.substring(index + 1), port);
247 }
248
249 String bindAddress = SocketUtil.getHostAddress(host, port);
250
251 System.setProperty("jgroups.bind_addr", bindAddress);
252
253 if (_log.isInfoEnabled()) {
254 _log.info(
255 "Set JGroups outgoing IP address to " + bindAddress + "}");
256 }
257 }
258
259 protected void initChannels() throws ChannelException {
260 Properties properties = PropsUtil.getProperties(
261 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES, true);
262
263 _channelCount = properties.size();
264
265 if ((_channelCount <= 0) || (_channelCount > _MAX_CHANNEL_COUNT)) {
266 throw new IllegalArgumentException(
267 "Channel count must be between 1 and " + _MAX_CHANNEL_COUNT);
268 }
269
270 _addresses = new ArrayList<org.jgroups.Address>(_channelCount);
271 _channels = new ArrayList<JChannel>(_channelCount);
272
273 List<String> keys = new ArrayList<String>(_channelCount);
274
275 for (Object key : properties.keySet()) {
276 keys.add((String)key);
277 }
278
279 Collections.sort(keys);
280
281 for (int i = 0; i < keys.size(); i++) {
282 String customName = keys.get(i);
283
284 String value = properties.getProperty(customName);
285
286 JChannel channel = createChannel(i, value);
287
288 _addresses.add(channel.getLocalAddress());
289 _channels.add(channel);
290 }
291 }
292
293 protected void initSystemProperties() {
294 for (String systemProperty :
295 PropsValues.CLUSTER_LINK_CHANNEL_SYSTEM_PROPERTIES) {
296
297 int index = systemProperty.indexOf(StringPool.COLON);
298
299 if (index == -1) {
300 continue;
301 }
302
303 String key = systemProperty.substring(0, index);
304 String value = systemProperty.substring(index + 1);
305
306 System.setProperty(key, value);
307
308 if (_log.isDebugEnabled()) {
309 _log.debug(
310 "Setting system property {key=" + key + ", value=" +
311 value + "}");
312 }
313 }
314 }
315
316 private static final String _LIFERAY_CHANNEL = "LIFERAY-CHANNEL-";
317
318 private static final int _MAX_CHANNEL_COUNT = Priority.values().length;
319
320 private static final Log _log =
321 LogFactoryUtil.getLog(ClusterLinkImpl.class);
322
323 private List<org.jgroups.Address> _addresses;
324 private int _channelCount;
325 private List<JChannel> _channels;
326 private ClusterForwardMessageListener _clusterForwardMessageListener;
327
328 }