1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * This library is free software; you can redistribute it and/or modify it under
5    * the terms of the GNU Lesser General Public License as published by the Free
6    * Software Foundation; either version 2.1 of the License, or (at your option)
7    * any later version.
8    *
9    * This library is distributed in the hope that it will be useful, but WITHOUT
10   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11   * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12   * details.
13   */
14  
15  package com.liferay.portal.cluster;
16  
17  import com.liferay.portal.kernel.cluster.Address;
18  import com.liferay.portal.kernel.cluster.ClusterException;
19  import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
20  import com.liferay.portal.kernel.cluster.ClusterRequest;
21  import com.liferay.portal.kernel.cluster.ClusterResponse;
22  import com.liferay.portal.kernel.log.Log;
23  import com.liferay.portal.kernel.log.LogFactoryUtil;
24  import com.liferay.portal.kernel.util.MethodInvoker;
25  import com.liferay.portal.kernel.util.MethodWrapper;
26  
27  import java.io.Serializable;
28  
29  import java.util.Map;
30  import java.util.concurrent.Future;
31  
32  import org.jgroups.ChannelException;
33  import org.jgroups.JChannel;
34  import org.jgroups.Message;
35  import org.jgroups.ReceiverAdapter;
36  import org.jgroups.View;
37  
38  /**
39   * <a href="ClusterInvokeReceiver.java.html"><b><i>View Source</i></b></a>
40   *
41   * @author Shuyang Zhou
42   * @author Tina Tian
43   */
44  public class ClusterInvokeReceiver extends ReceiverAdapter {
45  
46      public ClusterInvokeReceiver(
47          Map<String, Map<Address, Future<?>>> multicastResultMap,
48          Map<String, Future<?>> unicastResultMap) {
49  
50          _multicastResultMap = multicastResultMap;
51          _unicastResultMap = unicastResultMap;
52      }
53  
54      public void receive(Message message) {
55          org.jgroups.Address sourceAddress = message.getSrc();
56          org.jgroups.Address localAddress = _channel.getLocalAddress();
57  
58          Object obj = message.getObject();
59  
60          if (obj == null) {
61              if (_log.isWarnEnabled()) {
62                  _log.warn("Message content is null");
63              }
64  
65              return;
66          }
67  
68          if (localAddress.equals(sourceAddress) &&
69              ClusterExecutorUtil.isShortcutLocalMethod()) {
70  
71              return;
72          }
73  
74          if (obj instanceof ClusterRequest) {
75              ClusterRequest clusterRequest = (ClusterRequest)obj;
76  
77              ClusterResponse clusterResponse = new ClusterResponseImpl();
78  
79              clusterResponse.setMulticast(clusterRequest.isMulticast());
80              clusterResponse.setUuid(clusterRequest.getUuid());
81  
82              Object payload = clusterRequest.getPayload();
83  
84              if (payload instanceof MethodWrapper) {
85                  MethodWrapper methodWrapper = (MethodWrapper)payload;
86  
87                  try {
88                      Object returnValue = MethodInvoker.invoke(methodWrapper);
89  
90                      if (returnValue instanceof Serializable) {
91                          clusterResponse.setResult(returnValue);
92                      }
93                      else if (returnValue != null) {
94                          clusterResponse.setException(
95                              new ClusterException(
96                                  "Return value is not serializable"));
97                      }
98                  }
99                  catch (Exception e) {
100                     clusterResponse.setException(e);
101                 }
102             }
103             else {
104                 clusterResponse.setException(
105                     new ClusterException(
106                         "Payload is not of type " +
107                             MethodWrapper.class.getName()));
108             }
109 
110             try {
111                 _channel.send(sourceAddress, localAddress, clusterResponse);
112             }
113             catch (ChannelException ce) {
114                 _log.error(
115                     "Unable to send response message " + clusterResponse, ce);
116             }
117         }
118         else if (obj instanceof ClusterResponse) {
119             ClusterResponse clusterResponse = (ClusterResponse)obj;
120 
121             String uuid = clusterResponse.getUuid();
122 
123             if (clusterResponse.isMulticast() &&
124                 _multicastResultMap.containsKey(uuid)) {
125 
126                 Map<Address, Future<?>> results = _multicastResultMap.get(uuid);
127 
128                 Address address = new AddressImpl(sourceAddress);
129 
130                 if (results.containsKey(address)) {
131                     FutureResult<Object> futureResult =
132                         (FutureResult<Object>)results.get(address);
133 
134                     if (clusterResponse.hasException()) {
135                         futureResult.setException(
136                             clusterResponse.getException());
137                     }
138                     else {
139                         futureResult.setResult(clusterResponse.getResult());
140                     }
141                 }
142                 else {
143                     _log.error("New node coming from " + sourceAddress);
144                 }
145             }
146             else if (_unicastResultMap.containsKey(uuid)) {
147                 FutureResult<Object> futureResult =
148                     (FutureResult<Object>)_unicastResultMap.get(uuid);
149 
150                 if (clusterResponse.hasException()) {
151                     futureResult.setException(clusterResponse.getException());
152                 }
153                 else {
154                     futureResult.setResult(clusterResponse.getResult());
155                 }
156             }
157             else {
158                 _log.error("Unknow UUID " + uuid + " from " + sourceAddress);
159             }
160         }
161         else {
162             if (_log.isWarnEnabled()) {
163                 _log.warn(
164                     "Unable to process message content of type " +
165                         obj.getClass().getName());
166             }
167 
168             return;
169         }
170     }
171 
172     public void setChannel(JChannel channel) {
173         _channel = channel;
174     }
175 
176     public void viewAccepted(View view) {
177         if (_log.isInfoEnabled()) {
178             _log.info("Accepted view " + view);
179         }
180     }
181 
182     private static Log _log = LogFactoryUtil.getLog(
183         ClusterInvokeReceiver.class);
184 
185     private JChannel _channel;
186     private Map<String, Map<Address, Future<?>>> _multicastResultMap;
187     private Map<String, Future<?>> _unicastResultMap;
188 
189 }