001
014
015 package com.liferay.portal.liveusers.messaging;
016
017 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
018 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
019 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
020 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
021 import com.liferay.portal.kernel.cluster.ClusterRequest;
022 import com.liferay.portal.kernel.json.JSONFactoryUtil;
023 import com.liferay.portal.kernel.json.JSONObject;
024 import com.liferay.portal.kernel.log.Log;
025 import com.liferay.portal.kernel.log.LogFactoryUtil;
026 import com.liferay.portal.kernel.messaging.BaseMessageListener;
027 import com.liferay.portal.kernel.messaging.Message;
028 import com.liferay.portal.kernel.util.MethodHandler;
029 import com.liferay.portal.kernel.util.MethodKey;
030 import com.liferay.portal.liveusers.LiveUsers;
031
032 import java.util.Map;
033 import java.util.Set;
034 import java.util.concurrent.TimeUnit;
035 import java.util.concurrent.TimeoutException;
036
037
041 public class LiveUsersMessageListener extends BaseMessageListener {
042
043 protected void doCommandAddClusterNode(JSONObject jsonObject)
044 throws Exception {
045
046 String clusterNodeId = jsonObject.getString("clusterNodeId");
047
048 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
049 _getLocalClusterUsersMethodHandler, clusterNodeId);
050
051 ClusterExecutorUtil.execute(
052 clusterRequest, new LiveUsersClusterResponseCallback(clusterNodeId),
053 20000, TimeUnit.MILLISECONDS);
054 }
055
056 protected void doCommandRemoveClusterNode(JSONObject jsonObject)
057 throws Exception {
058
059 String clusterNodeId = jsonObject.getString("clusterNodeId");
060
061 LiveUsers.removeClusterNode(clusterNodeId);
062 }
063
064 protected void doCommandSignIn(JSONObject jsonObject) throws Exception {
065 String clusterNodeId = jsonObject.getString("clusterNodeId");
066 long companyId = jsonObject.getLong("companyId");
067 long userId = jsonObject.getLong("userId");
068 String sessionId = jsonObject.getString("sessionId");
069 String remoteAddr = jsonObject.getString("remoteAddr");
070 String remoteHost = jsonObject.getString("remoteHost");
071 String userAgent = jsonObject.getString("userAgent");
072
073 LiveUsers.signIn(
074 clusterNodeId, companyId, userId, sessionId, remoteAddr, remoteHost,
075 userAgent);
076 }
077
078 protected void doCommandSignOut(JSONObject jsonObject) throws Exception {
079 String clusterNodeId = jsonObject.getString("clusterNodeId");
080 long companyId = jsonObject.getLong("companyId");
081 long userId = jsonObject.getLong("userId");
082 String sessionId = jsonObject.getString("sessionId");
083
084 LiveUsers.signOut(clusterNodeId, companyId, userId, sessionId);
085 }
086
087 @Override
088 protected void doReceive(Message message) throws Exception {
089 String payload = (String)message.getPayload();
090
091 JSONObject jsonObject = JSONFactoryUtil.createJSONObject(payload);
092
093 String command = jsonObject.getString("command");
094
095 if (command.equals("addClusterNode")) {
096 doCommandAddClusterNode(jsonObject);
097 }
098 else if (command.equals("removeClusterNode")) {
099 doCommandRemoveClusterNode(jsonObject);
100 }
101 else if (command.equals("signIn")) {
102 doCommandSignIn(jsonObject);
103 }
104 else if (command.equals("signOut")) {
105 doCommandSignOut(jsonObject);
106 }
107 }
108
109 private static Log _log = LogFactoryUtil.getLog(
110 LiveUsersMessageListener.class);
111
112 private static MethodHandler _getLocalClusterUsersMethodHandler =
113 new MethodHandler(
114 new MethodKey(LiveUsers.class.getName(), "getLocalClusterUsers"));
115
116 private class LiveUsersClusterResponseCallback
117 extends BaseClusterResponseCallback {
118
119 public LiveUsersClusterResponseCallback(String clusterNodeId) {
120 _clusterNodeId = clusterNodeId;
121 }
122
123 @Override
124 public void callback(ClusterNodeResponses clusterNodeResponses) {
125 ClusterNodeResponse clusterNodeResponse =
126 clusterNodeResponses.getClusterResponse(_clusterNodeId);
127
128 try {
129 Object result = clusterNodeResponse.getResult();
130
131 if (result == null) {
132 return;
133 }
134
135 Map<Long, Map<Long, Set<String>>> clusterUsers =
136 (Map<Long, Map<Long, Set<String>>>)result;
137
138 LiveUsers.addClusterNode(_clusterNodeId, clusterUsers);
139 }
140 catch (Exception e) {
141 _log.error("Unable to add cluster node " + _clusterNodeId, e);
142 }
143 }
144
145 @Override
146 public void processTimeoutException(TimeoutException timeoutException) {
147 _log.error(
148 "Uanble to add cluster node " + _clusterNodeId,
149 timeoutException);
150 }
151
152 private String _clusterNodeId;
153
154 }
155
156 }