001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.liveusers.messaging;
016    
017    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
018    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
019    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
020    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
021    import com.liferay.portal.kernel.cluster.ClusterRequest;
022    import com.liferay.portal.kernel.json.JSONFactoryUtil;
023    import com.liferay.portal.kernel.json.JSONObject;
024    import com.liferay.portal.kernel.log.Log;
025    import com.liferay.portal.kernel.log.LogFactoryUtil;
026    import com.liferay.portal.kernel.messaging.BaseMessageListener;
027    import com.liferay.portal.kernel.messaging.Message;
028    import com.liferay.portal.kernel.util.MethodHandler;
029    import com.liferay.portal.kernel.util.MethodKey;
030    import com.liferay.portal.liveusers.LiveUsers;
031    
032    import java.util.Map;
033    import java.util.Set;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.TimeoutException;
036    
037    /**
038     * @author Brian Wing Shun Chan
039     * @author Amos Fong
040     */
041    public class LiveUsersMessageListener extends BaseMessageListener {
042    
043            protected void doCommandAddClusterNode(JSONObject jsonObject)
044                    throws Exception {
045    
046                    String clusterNodeId = jsonObject.getString("clusterNodeId");
047    
048                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
049                            _getLocalClusterUsersMethodHandler, clusterNodeId);
050    
051                    ClusterExecutorUtil.execute(
052                            clusterRequest, new LiveUsersClusterResponseCallback(clusterNodeId),
053                            20000, TimeUnit.MILLISECONDS);
054            }
055    
056            protected void doCommandRemoveClusterNode(JSONObject jsonObject)
057                    throws Exception {
058    
059                    String clusterNodeId = jsonObject.getString("clusterNodeId");
060    
061                    LiveUsers.removeClusterNode(clusterNodeId);
062            }
063    
064            protected void doCommandSignIn(JSONObject jsonObject) throws Exception {
065                    String clusterNodeId = jsonObject.getString("clusterNodeId");
066                    long companyId = jsonObject.getLong("companyId");
067                    long userId = jsonObject.getLong("userId");
068                    String sessionId = jsonObject.getString("sessionId");
069                    String remoteAddr = jsonObject.getString("remoteAddr");
070                    String remoteHost = jsonObject.getString("remoteHost");
071                    String userAgent = jsonObject.getString("userAgent");
072    
073                    LiveUsers.signIn(
074                            clusterNodeId, companyId, userId, sessionId, remoteAddr, remoteHost,
075                            userAgent);
076            }
077    
078            protected void doCommandSignOut(JSONObject jsonObject) throws Exception {
079                    String clusterNodeId = jsonObject.getString("clusterNodeId");
080                    long companyId = jsonObject.getLong("companyId");
081                    long userId = jsonObject.getLong("userId");
082                    String sessionId = jsonObject.getString("sessionId");
083    
084                    LiveUsers.signOut(clusterNodeId, companyId, userId, sessionId);
085            }
086    
087            @Override
088            protected void doReceive(Message message) throws Exception {
089                    String payload = (String)message.getPayload();
090    
091                    JSONObject jsonObject = JSONFactoryUtil.createJSONObject(payload);
092    
093                    String command = jsonObject.getString("command");
094    
095                    if (command.equals("addClusterNode")) {
096                            doCommandAddClusterNode(jsonObject);
097                    }
098                    else if (command.equals("removeClusterNode")) {
099                            doCommandRemoveClusterNode(jsonObject);
100                    }
101                    else if (command.equals("signIn")) {
102                            doCommandSignIn(jsonObject);
103                    }
104                    else if (command.equals("signOut")) {
105                            doCommandSignOut(jsonObject);
106                    }
107            }
108    
109            private static Log _log = LogFactoryUtil.getLog(
110                    LiveUsersMessageListener.class);
111    
112            private static MethodHandler _getLocalClusterUsersMethodHandler =
113                    new MethodHandler(
114                            new MethodKey(LiveUsers.class.getName(), "getLocalClusterUsers"));
115    
116            private class LiveUsersClusterResponseCallback
117                    extends BaseClusterResponseCallback {
118    
119                    public LiveUsersClusterResponseCallback(String clusterNodeId) {
120                            _clusterNodeId = clusterNodeId;
121                    }
122    
123                    @Override
124                    public void callback(ClusterNodeResponses clusterNodeResponses) {
125                            ClusterNodeResponse clusterNodeResponse =
126                                    clusterNodeResponses.getClusterResponse(_clusterNodeId);
127    
128                            try {
129                                    Object result = clusterNodeResponse.getResult();
130    
131                                    if (result == null) {
132                                            return;
133                                    }
134    
135                                    Map<Long, Map<Long, Set<String>>> clusterUsers =
136                                            (Map<Long, Map<Long, Set<String>>>)result;
137    
138                                    LiveUsers.addClusterNode(_clusterNodeId, clusterUsers);
139                            }
140                            catch (Exception e) {
141                                    _log.error("Unable to add cluster node " + _clusterNodeId, e);
142                            }
143                    }
144    
145                    @Override
146                    public void processTimeoutException(TimeoutException timeoutException) {
147                            _log.error(
148                                    "Uanble to add cluster node " + _clusterNodeId,
149                                    timeoutException);
150                    }
151    
152                    private String _clusterNodeId;
153    
154            }
155    
156    }