1
14
15 package com.liferay.portal.cluster;
16
17 import com.liferay.portal.kernel.cluster.Address;
18 import com.liferay.portal.kernel.cluster.ClusterException;
19 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
20 import com.liferay.portal.kernel.cluster.ClusterRequest;
21 import com.liferay.portal.kernel.cluster.ClusterResponse;
22 import com.liferay.portal.kernel.log.Log;
23 import com.liferay.portal.kernel.log.LogFactoryUtil;
24 import com.liferay.portal.kernel.util.MethodInvoker;
25 import com.liferay.portal.kernel.util.MethodWrapper;
26
27 import java.io.Serializable;
28
29 import java.util.Map;
30 import java.util.concurrent.Future;
31
32 import org.jgroups.ChannelException;
33 import org.jgroups.JChannel;
34 import org.jgroups.Message;
35 import org.jgroups.ReceiverAdapter;
36 import org.jgroups.View;
37
38
44 public class ClusterInvokeReceiver extends ReceiverAdapter {
45
46 public ClusterInvokeReceiver(
47 Map<String, Map<Address, Future<?>>> multicastResultMap,
48 Map<String, Future<?>> unicastResultMap) {
49
50 _multicastResultMap = multicastResultMap;
51 _unicastResultMap = unicastResultMap;
52 }
53
54 public void receive(Message message) {
55 org.jgroups.Address sourceAddress = message.getSrc();
56 org.jgroups.Address localAddress = _channel.getLocalAddress();
57
58 Object obj = message.getObject();
59
60 if (obj == null) {
61 if (_log.isWarnEnabled()) {
62 _log.warn("Message content is null");
63 }
64
65 return;
66 }
67
68 if (localAddress.equals(sourceAddress) &&
69 ClusterExecutorUtil.isShortcutLocalMethod()) {
70
71 return;
72 }
73
74 if (obj instanceof ClusterRequest) {
75 ClusterRequest clusterRequest = (ClusterRequest)obj;
76
77 ClusterResponse clusterResponse = new ClusterResponseImpl();
78
79 clusterResponse.setMulticast(clusterRequest.isMulticast());
80 clusterResponse.setUuid(clusterRequest.getUuid());
81
82 Object payload = clusterRequest.getPayload();
83
84 if (payload instanceof MethodWrapper) {
85 MethodWrapper methodWrapper = (MethodWrapper)payload;
86
87 try {
88 Object returnValue = MethodInvoker.invoke(methodWrapper);
89
90 if (returnValue instanceof Serializable) {
91 clusterResponse.setResult(returnValue);
92 }
93 else if (returnValue != null) {
94 clusterResponse.setException(
95 new ClusterException(
96 "Return value is not serializable"));
97 }
98 }
99 catch (Exception e) {
100 clusterResponse.setException(e);
101 }
102 }
103 else {
104 clusterResponse.setException(
105 new ClusterException(
106 "Payload is not of type " +
107 MethodWrapper.class.getName()));
108 }
109
110 try {
111 _channel.send(sourceAddress, localAddress, clusterResponse);
112 }
113 catch (ChannelException ce) {
114 _log.error(
115 "Unable to send response message " + clusterResponse, ce);
116 }
117 }
118 else if (obj instanceof ClusterResponse) {
119 ClusterResponse clusterResponse = (ClusterResponse)obj;
120
121 String uuid = clusterResponse.getUuid();
122
123 if (clusterResponse.isMulticast() &&
124 _multicastResultMap.containsKey(uuid)) {
125
126 Map<Address, Future<?>> results = _multicastResultMap.get(uuid);
127
128 Address address = new AddressImpl(sourceAddress);
129
130 if (results.containsKey(address)) {
131 FutureResult<Object> futureResult =
132 (FutureResult<Object>)results.get(address);
133
134 if (clusterResponse.hasException()) {
135 futureResult.setException(
136 clusterResponse.getException());
137 }
138 else {
139 futureResult.setResult(clusterResponse.getResult());
140 }
141 }
142 else {
143 _log.error("New node coming from " + sourceAddress);
144 }
145 }
146 else if (_unicastResultMap.containsKey(uuid)) {
147 FutureResult<Object> futureResult =
148 (FutureResult<Object>)_unicastResultMap.get(uuid);
149
150 if (clusterResponse.hasException()) {
151 futureResult.setException(clusterResponse.getException());
152 }
153 else {
154 futureResult.setResult(clusterResponse.getResult());
155 }
156 }
157 else {
158 _log.error("Unknow UUID " + uuid + " from " + sourceAddress);
159 }
160 }
161 else {
162 if (_log.isWarnEnabled()) {
163 _log.warn(
164 "Unable to process message content of type " +
165 obj.getClass().getName());
166 }
167
168 return;
169 }
170 }
171
172 public void setChannel(JChannel channel) {
173 _channel = channel;
174 }
175
176 public void viewAccepted(View view) {
177 if (_log.isInfoEnabled()) {
178 _log.info("Accepted view " + view);
179 }
180 }
181
182 private static Log _log = LogFactoryUtil.getLog(
183 ClusterInvokeReceiver.class);
184
185 private JChannel _channel;
186 private Map<String, Map<Address, Future<?>>> _multicastResultMap;
187 private Map<String, Future<?>> _unicastResultMap;
188
189 }