1   /**
2    * Copyright (c) 2000-2009 Liferay, Inc. All rights reserved.
3    *
4    *
5    *
6    *
7    * The contents of this file are subject to the terms of the Liferay Enterprise
8    * Subscription License ("License"). You may not use this file except in
9    * compliance with the License. You can obtain a copy of the License by
10   * contacting Liferay, Inc. See the License for the specific language governing
11   * permissions and limitations under the License, including but not limited to
12   * distribution rights of the Software.
13   *
14   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20   * SOFTWARE.
21   */
22  
23  package com.liferay.portal.cluster;
24  
25  import com.liferay.portal.kernel.cluster.Address;
26  import com.liferay.portal.kernel.cluster.ClusterLink;
27  import com.liferay.portal.kernel.cluster.Priority;
28  import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
29  import com.liferay.portal.kernel.log.Log;
30  import com.liferay.portal.kernel.log.LogFactoryUtil;
31  import com.liferay.portal.kernel.messaging.Message;
32  import com.liferay.portal.kernel.util.GetterUtil;
33  import com.liferay.portal.kernel.util.IPDetector;
34  import com.liferay.portal.kernel.util.OSDetector;
35  import com.liferay.portal.kernel.util.PropsKeys;
36  import com.liferay.portal.kernel.util.SocketUtil;
37  import com.liferay.portal.kernel.util.StringPool;
38  import com.liferay.portal.kernel.util.Validator;
39  import com.liferay.portal.util.PropsUtil;
40  import com.liferay.portal.util.PropsValues;
41  
42  import java.io.IOException;
43  
44  import java.util.ArrayList;
45  import java.util.Collections;
46  import java.util.List;
47  import java.util.Properties;
48  import java.util.Vector;
49  
50  import org.jgroups.ChannelException;
51  import org.jgroups.JChannel;
52  import org.jgroups.ReceiverAdapter;
53  import org.jgroups.View;
54  
55  /**
56   * <a href="ClusterLinkImpl.java.html"><b><i>View Source</i></b></a>
57   *
58   * @author Shuyang Zhou
59   */
60  public class ClusterLinkImpl implements ClusterLink {
61  
62      public void afterPropertiesSet() {
63          if (!PropsValues.CLUSTER_LINK_ENABLED) {
64              return;
65          }
66  
67          if (OSDetector.isUnix() && IPDetector.isSupportsV6() &&
68              !IPDetector.isPrefersV4() && _log.isWarnEnabled()) {
69  
70              StringBuilder sb = new StringBuilder();
71  
72              sb.append("You are on an Unix server with IPv6 enabled. JGroups ");
73              sb.append("may not work with IPv6. If you see a multicast ");
74              sb.append("error, try adding java.net.preferIPv4Stack=true ");
75              sb.append("as a JVM startup parameter.");
76  
77              _log.warn(sb.toString());
78          }
79  
80          initSystemProperties();
81  
82          try {
83              initBindAddress();
84          }
85          catch (IOException ioe) {
86              if (_log.isWarnEnabled()) {
87                  _log.warn("Failed to initialize outgoing IP address", ioe);
88              }
89          }
90  
91          try {
92              initChannels();
93          }
94          catch (Exception e) {
95              _log.error(e, e);
96          }
97      }
98  
99      public void destory() {
100         if (!PropsValues.CLUSTER_LINK_ENABLED) {
101             return;
102         }
103 
104         for (JChannel channel : _channels) {
105             channel.close();
106         }
107     }
108 
109     public List<Address> getAddresses() {
110         if (!PropsValues.CLUSTER_LINK_ENABLED) {
111             return Collections.EMPTY_LIST;
112         }
113 
114         Vector<org.jgroups.Address> jGroupsAddresses =
115             _channels.get(0).getView().getMembers();
116 
117         if (jGroupsAddresses == null) {
118             return new ArrayList<Address>();
119         }
120 
121         List<Address> addresses = new ArrayList<Address>(
122             jGroupsAddresses.size());
123 
124         for (org.jgroups.Address address : jGroupsAddresses) {
125             addresses.add(new AddressImpl(address));
126         }
127 
128         return addresses;
129     }
130 
131     public boolean isEnabled() {
132         return PropsValues.CLUSTER_LINK_ENABLED;
133     }
134 
135     public void sendMulticastMessage(Message message, Priority priority) {
136         if (!PropsValues.CLUSTER_LINK_ENABLED) {
137             return;
138         }
139 
140         JChannel channel = getChannel(priority);
141 
142         try {
143             channel.send(null, null, message);
144         }
145         catch (ChannelException ce) {
146             _log.error("Unable to send multicast message " + message, ce);
147         }
148     }
149 
150     public void sendUnicastMessage(
151         Address address, Message message, Priority priority) {
152 
153         if (!PropsValues.CLUSTER_LINK_ENABLED) {
154             return;
155         }
156 
157         org.jgroups.Address jGroupsAddress =
158             (org.jgroups.Address)address.getRealAddress();
159 
160         JChannel channel = getChannel(priority);
161 
162         try {
163             channel.send(jGroupsAddress, null, message);
164         }
165         catch (ChannelException ce) {
166             _log.error("Unable to send unicast message:" + message, ce);
167         }
168     }
169 
170     public void setClusterForwardMessageListener(
171         ClusterForwardMessageListener clusterForwardMessageListener) {
172 
173         _clusterForwardMessageListener = clusterForwardMessageListener;
174     }
175 
176     protected JChannel createChannel(int index, String properties)
177         throws ChannelException {
178 
179         JChannel channel = new JChannel(properties);
180 
181         channel.setReceiver(
182             new ReceiverAdapter() {
183 
184                 public void receive(org.jgroups.Message message) {
185                     if ((!_addresses.contains(message.getSrc())) ||
186                         (message.getDest() != null)) {
187 
188                         _clusterForwardMessageListener.receive(
189                             (Message)message.getObject());
190                     }
191                     else {
192                         if (_log.isDebugEnabled()) {
193                             _log.debug("Block received message " + message);
194                         }
195                     }
196                 }
197 
198                 public void viewAccepted(View view) {
199                     if (_log.isDebugEnabled()) {
200                         _log.debug("Cluster link accepted view " + view);
201                     }
202                 }
203 
204             }
205         );
206 
207         channel.connect(_LIFERAY_CHANNEL + index);
208 
209         if (_log.isInfoEnabled()) {
210             _log.info(
211                 "Create a new channel with properties " +
212                     channel.getProperties());
213         }
214 
215         return channel;
216     }
217 
218     protected JChannel getChannel(Priority priority) {
219         int channelIndex =
220             priority.ordinal() * _channelCount / _MAX_CHANNEL_COUNT;
221 
222         if (_log.isDebugEnabled()) {
223             _log.debug(
224                 "Select channel number " + channelIndex + " for priority " +
225                     priority);
226         }
227 
228         return _channels.get(channelIndex);
229     }
230 
231     protected void initBindAddress() throws IOException {
232         String autodetectAddress = PropsValues.CLUSTER_LINK_AUTODETECT_ADDRESS;
233 
234         if (Validator.isNull(autodetectAddress)) {
235             return;
236         }
237 
238         String host = autodetectAddress;
239         int port = 80;
240 
241         int index = autodetectAddress.indexOf(StringPool.COLON);
242 
243         if (index != -1) {
244             host = autodetectAddress.substring(0, index);
245             port = GetterUtil.getInteger(
246                 autodetectAddress.substring(index + 1), port);
247         }
248 
249         String bindAddress = SocketUtil.getHostAddress(host, port);
250 
251         System.setProperty("jgroups.bind_addr", bindAddress);
252 
253         if (_log.isInfoEnabled()) {
254             _log.info(
255                 "Set JGroups outgoing IP address to " + bindAddress + "}");
256         }
257     }
258 
259     protected void initChannels() throws ChannelException {
260         Properties properties = PropsUtil.getProperties(
261             PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES, true);
262 
263         _channelCount = properties.size();
264 
265         if ((_channelCount <= 0) || (_channelCount > _MAX_CHANNEL_COUNT)) {
266             throw new IllegalArgumentException(
267                 "Channel count must be between 1 and " + _MAX_CHANNEL_COUNT);
268         }
269 
270         _addresses = new ArrayList<org.jgroups.Address>(_channelCount);
271         _channels = new ArrayList<JChannel>(_channelCount);
272 
273         List<String> keys = new ArrayList<String>(_channelCount);
274 
275         for (Object key : properties.keySet()) {
276             keys.add((String)key);
277         }
278 
279         Collections.sort(keys);
280 
281         for (int i = 0; i < keys.size(); i++) {
282             String customName = keys.get(i);
283 
284             String value = properties.getProperty(customName);
285 
286             JChannel channel = createChannel(i, value);
287 
288             _addresses.add(channel.getLocalAddress());
289             _channels.add(channel);
290         }
291     }
292 
293     protected void initSystemProperties() {
294         for (String systemProperty :
295                 PropsValues.CLUSTER_LINK_CHANNEL_SYSTEM_PROPERTIES) {
296 
297             int index = systemProperty.indexOf(StringPool.COLON);
298 
299             if (index == -1) {
300                 continue;
301             }
302 
303             String key = systemProperty.substring(0, index);
304             String value = systemProperty.substring(index + 1);
305 
306             System.setProperty(key, value);
307 
308             if (_log.isDebugEnabled()) {
309                 _log.debug(
310                     "Setting system property {key=" + key + ", value=" +
311                         value + "}");
312             }
313         }
314     }
315 
316     private static final String _LIFERAY_CHANNEL = "LIFERAY-CHANNEL-";
317 
318     private static final int _MAX_CHANNEL_COUNT = Priority.values().length;
319 
320     private static final Log _log =
321         LogFactoryUtil.getLog(ClusterLinkImpl.class);
322 
323     private List<org.jgroups.Address> _addresses;
324     private int _channelCount;
325     private List<JChannel> _channels;
326     private ClusterForwardMessageListener _clusterForwardMessageListener;
327 
328 }