001
014
015 package com.liferay.portal.kernel.cluster;
016
017 import java.util.HashSet;
018 import java.util.List;
019 import java.util.Set;
020 import java.util.concurrent.BlockingQueue;
021 import java.util.concurrent.CancellationException;
022 import java.util.concurrent.CountDownLatch;
023 import java.util.concurrent.Future;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.TimeoutException;
026
027
030 public class FutureClusterResponses implements Future<ClusterNodeResponses> {
031
032 public FutureClusterResponses(List<Address> addresses) {
033 _clusterNodeResponses = new ClusterNodeResponses();
034 _countDownLatch = new CountDownLatch(addresses.size());
035 _expectedReplyAddress = new HashSet<Address>(addresses);
036 }
037
038 public void addClusterNodeResponse(
039 ClusterNodeResponse clusterNodeResponse) {
040
041 _clusterNodeResponses.addClusterResponse(clusterNodeResponse);
042
043 _countDownLatch.countDown();
044 }
045
046 public void addExpectedReplyAddress(Address address) {
047 _expectedReplyAddress.add(address);
048 }
049
050 @Override
051 public boolean cancel(boolean mayInterruptIfRunning) {
052 if (_cancelled || isDone()) {
053 return false;
054 }
055
056 _cancelled = true;
057
058 return true;
059 }
060
061 public boolean expectsReply(Address address) {
062 return _expectedReplyAddress.contains(address);
063 }
064
065 @Override
066 public ClusterNodeResponses get() throws InterruptedException {
067 if (_cancelled) {
068 throw new CancellationException();
069 }
070
071 _countDownLatch.await();
072
073 return _clusterNodeResponses;
074 }
075
076 @Override
077 public ClusterNodeResponses get(long timeout, TimeUnit timeUnit)
078 throws InterruptedException, TimeoutException {
079
080 if (_cancelled) {
081 throw new CancellationException();
082 }
083
084 if (_countDownLatch.await(timeout, timeUnit)) {
085 return _clusterNodeResponses;
086 }
087 else {
088 throw new TimeoutException();
089 }
090 }
091
092 public BlockingQueue<ClusterNodeResponse> getPartialResults() {
093 return _clusterNodeResponses.getClusterResponses();
094 }
095
096 @Override
097 public boolean isCancelled() {
098 return _cancelled;
099 }
100
101 @Override
102 public boolean isDone() {
103 if ((_countDownLatch.getCount() == 0) || _cancelled) {
104 return true;
105 }
106 else {
107 return false;
108 }
109 }
110
111 private boolean _cancelled;
112 private ClusterNodeResponses _clusterNodeResponses;
113 private CountDownLatch _countDownLatch;
114 private Set<Address> _expectedReplyAddress;
115
116 }