001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.poller;
016    
017    import com.liferay.portal.kernel.json.JSONArray;
018    import com.liferay.portal.kernel.messaging.Message;
019    import com.liferay.portal.kernel.messaging.MessageBusUtil;
020    import com.liferay.portal.kernel.messaging.MessageListener;
021    import com.liferay.portal.kernel.poller.PollerRequest;
022    import com.liferay.portal.kernel.poller.PollerResponse;
023    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
024    
025    import java.util.HashMap;
026    import java.util.Map;
027    
028    /**
029     * @author Michael C. Han
030     */
031    public class PollerRequestManager implements MessageListener {
032    
033            public PollerRequestManager(
034                    JSONArray pollerResponseChunksJSON, String destinationName,
035                    String responseDestinationName, long timeout) {
036    
037                    _pollerResponseChunksJSON = pollerResponseChunksJSON;
038                    _destinationName = destinationName;
039                    _responseDestinationName = responseDestinationName;
040                    _timeout = timeout;
041            }
042    
043            public void addPollerRequest(PollerRequest pollerRequest) {
044                    if (pollerRequest == null) {
045                            return;
046                    }
047    
048                    _pollerRequests.put(pollerRequest.getPortletId(), pollerRequest);
049            }
050    
051            public void clearRequests() {
052                    _pollerRequests.clear();
053                    _responseIds.clear();
054                    _responseCount = 0;
055            }
056    
057            public JSONArray getPollerResponse() {
058                    return _pollerResponseChunksJSON;
059            }
060    
061            public void processRequests() {
062                    MessageBusUtil.registerMessageListener(_responseDestinationName, this);
063    
064                    try {
065                            for (PollerRequest pollerRequest : _pollerRequests.values()) {
066                                    Message message = new Message();
067    
068                                    message.setPayload(pollerRequest);
069                                    message.setResponseDestinationName(_responseDestinationName);
070    
071                                    String responseId = PortalUUIDUtil.generate();
072    
073                                    message.setResponseId(responseId);
074    
075                                    _responseIds.put(responseId, responseId);
076    
077                                    MessageBusUtil.sendMessage(_destinationName, message);
078                            }
079    
080                            synchronized (this) {
081                                    if (_responseCount != _pollerRequests.size()) {
082                                            try {
083                                                    this.wait(_timeout);
084                                            }
085                                            catch (InterruptedException ie) {
086                                            }
087                                    }
088                            }
089                    }
090                    finally {
091                            MessageBusUtil.unregisterMessageListener(
092                                    _responseDestinationName, this);
093                    }
094            }
095    
096            public void receive(Message message) {
097                    if (!_responseIds.containsKey(message.getResponseId())) {
098                            return;
099                    }
100    
101                    if (_pollerResponseChunksJSON != null) {
102                            PollerResponse pollerResponse =
103                                    (PollerResponse)message.getPayload();
104    
105                            if (pollerResponse != null) {
106                                    _pollerResponseChunksJSON.put(pollerResponse.toJSONObject());
107                            }
108                    }
109    
110                    synchronized (this) {
111                            _responseCount++;
112    
113                            if (_responseCount == _pollerRequests.size()) {
114                                    notify();
115                            }
116                    }
117            }
118    
119            private String _destinationName;
120            private Map<String, PollerRequest> _pollerRequests =
121                    new HashMap<String, PollerRequest>();
122            private JSONArray _pollerResponseChunksJSON;
123            private int _responseCount;
124            private String _responseDestinationName;
125            private Map<String, String> _responseIds = new HashMap<String, String>();
126            private long _timeout;
127    
128    }