001
014
015 package com.liferay.portal.poller;
016
017 import com.liferay.portal.kernel.json.JSONArray;
018 import com.liferay.portal.kernel.messaging.Message;
019 import com.liferay.portal.kernel.messaging.MessageBusUtil;
020 import com.liferay.portal.kernel.messaging.MessageListener;
021 import com.liferay.portal.kernel.poller.PollerRequest;
022 import com.liferay.portal.kernel.poller.PollerResponse;
023 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
024
025 import java.util.HashMap;
026 import java.util.Map;
027
028
031 public class PollerRequestManager implements MessageListener {
032
033 public PollerRequestManager(
034 JSONArray pollerResponseChunksJSON, String destinationName,
035 String responseDestinationName, long timeout) {
036
037 _pollerResponseChunksJSON = pollerResponseChunksJSON;
038 _destinationName = destinationName;
039 _responseDestinationName = responseDestinationName;
040 _timeout = timeout;
041 }
042
043 public void addPollerRequest(PollerRequest pollerRequest) {
044 if (pollerRequest == null) {
045 return;
046 }
047
048 _pollerRequests.put(pollerRequest.getPortletId(), pollerRequest);
049 }
050
051 public void clearRequests() {
052 _pollerRequests.clear();
053 _responseIds.clear();
054 _responseCount = 0;
055 }
056
057 public JSONArray getPollerResponse() {
058 return _pollerResponseChunksJSON;
059 }
060
061 public void processRequests() {
062 MessageBusUtil.registerMessageListener(_responseDestinationName, this);
063
064 try {
065 for (PollerRequest pollerRequest : _pollerRequests.values()) {
066 Message message = new Message();
067
068 message.setPayload(pollerRequest);
069 message.setResponseDestinationName(_responseDestinationName);
070
071 String responseId = PortalUUIDUtil.generate();
072
073 message.setResponseId(responseId);
074
075 _responseIds.put(responseId, responseId);
076
077 MessageBusUtil.sendMessage(_destinationName, message);
078 }
079
080 synchronized (this) {
081 if (_responseCount != _pollerRequests.size()) {
082 try {
083 this.wait(_timeout);
084 }
085 catch (InterruptedException ie) {
086 }
087 }
088 }
089 }
090 finally {
091 MessageBusUtil.unregisterMessageListener(
092 _responseDestinationName, this);
093 }
094 }
095
096 public void receive(Message message) {
097 if (!_responseIds.containsKey(message.getResponseId())) {
098 return;
099 }
100
101 if (_pollerResponseChunksJSON != null) {
102 PollerResponse pollerResponse =
103 (PollerResponse)message.getPayload();
104
105 if (pollerResponse != null) {
106 _pollerResponseChunksJSON.put(pollerResponse.toJSONObject());
107 }
108 }
109
110 synchronized (this) {
111 _responseCount++;
112
113 if (_responseCount == _pollerRequests.size()) {
114 notify();
115 }
116 }
117 }
118
119 private String _destinationName;
120 private Map<String, PollerRequest> _pollerRequests =
121 new HashMap<String, PollerRequest>();
122 private JSONArray _pollerResponseChunksJSON;
123 private int _responseCount;
124 private String _responseDestinationName;
125 private Map<String, String> _responseIds = new HashMap<String, String>();
126 private long _timeout;
127
128 }