001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.cluster;
016    
017    import java.util.HashSet;
018    import java.util.List;
019    import java.util.Set;
020    import java.util.concurrent.BlockingQueue;
021    import java.util.concurrent.CancellationException;
022    import java.util.concurrent.CountDownLatch;
023    import java.util.concurrent.Future;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.TimeoutException;
026    
027    /**
028     * @author Tina Tian
029     */
030    public class FutureClusterResponses implements Future<ClusterNodeResponses> {
031    
032            public FutureClusterResponses(List<Address> addresses) {
033                    _clusterNodeResponses = new ClusterNodeResponses();
034                    _countDownLatch = new CountDownLatch(addresses.size());
035                    _expectedReplyAddress = new HashSet<Address>(addresses);
036            }
037    
038            public void addClusterNodeResponse(
039                    ClusterNodeResponse clusterNodeResponse) {
040    
041                    _clusterNodeResponses.addClusterResponse(clusterNodeResponse);
042    
043                    _countDownLatch.countDown();
044            }
045    
046            public void addExpectedReplyAddress(Address address) {
047                    _expectedReplyAddress.add(address);
048            }
049    
050            @Override
051            public boolean cancel(boolean mayInterruptIfRunning) {
052                    if (_cancelled || isDone()) {
053                            return false;
054                    }
055    
056                    _cancelled = true;
057    
058                    return true;
059            }
060    
061            public boolean expectsReply(Address address) {
062                    return _expectedReplyAddress.contains(address);
063            }
064    
065            @Override
066            public ClusterNodeResponses get() throws InterruptedException {
067                    if (_cancelled) {
068                            throw new CancellationException();
069                    }
070    
071                    _countDownLatch.await();
072    
073                    return _clusterNodeResponses;
074            }
075    
076            @Override
077            public ClusterNodeResponses get(long timeout, TimeUnit timeUnit)
078                    throws InterruptedException, TimeoutException {
079    
080                    if (_cancelled) {
081                            throw new CancellationException();
082                    }
083    
084                    if (_countDownLatch.await(timeout, timeUnit)) {
085                            return _clusterNodeResponses;
086                    }
087                    else {
088                            throw new TimeoutException();
089                    }
090            }
091    
092            public BlockingQueue<ClusterNodeResponse> getPartialResults() {
093                    return _clusterNodeResponses.getClusterResponses();
094            }
095    
096            @Override
097            public boolean isCancelled() {
098                    return _cancelled;
099            }
100    
101            @Override
102            public boolean isDone() {
103                    if ((_countDownLatch.getCount() == 0) || _cancelled) {
104                            return true;
105                    }
106                    else {
107                            return false;
108                    }
109            }
110    
111            private boolean _cancelled;
112            private ClusterNodeResponses _clusterNodeResponses;
113            private CountDownLatch _countDownLatch;
114            private Set<Address> _expectedReplyAddress;
115    
116    }