001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.blocking;
016    
017    import com.liferay.portal.kernel.nio.intraband.BaseIntraband;
018    import com.liferay.portal.kernel.nio.intraband.ChannelContext;
019    import com.liferay.portal.kernel.nio.intraband.Datagram;
020    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
021    import com.liferay.portal.kernel.util.NamedThreadFactory;
022    
023    import java.io.IOException;
024    
025    import java.nio.channels.Channel;
026    import java.nio.channels.GatheringByteChannel;
027    import java.nio.channels.ScatteringByteChannel;
028    import java.nio.channels.SelectableChannel;
029    
030    import java.util.Queue;
031    import java.util.concurrent.BlockingQueue;
032    import java.util.concurrent.Callable;
033    import java.util.concurrent.CountDownLatch;
034    import java.util.concurrent.ExecutorService;
035    import java.util.concurrent.Executors;
036    import java.util.concurrent.Future;
037    import java.util.concurrent.LinkedBlockingQueue;
038    import java.util.concurrent.ThreadFactory;
039    import java.util.concurrent.TimeUnit;
040    
041    /**
042     * @author Shuyang Zhou
043     */
044    public class ExecutorIntraband extends BaseIntraband {
045    
046            public ExecutorIntraband(long defaultTimeout) {
047                    super(defaultTimeout);
048            }
049    
050            @Override
051            public void close() throws InterruptedException, IOException {
052                    executorService.shutdownNow();
053    
054                    executorService.awaitTermination(defaultTimeout, TimeUnit.MILLISECONDS);
055    
056                    super.close();
057            }
058    
059            @Override
060            public RegistrationReference registerChannel(Channel channel) {
061                    if (channel == null) {
062                            throw new NullPointerException("Channel is null");
063                    }
064    
065                    if (!(channel instanceof GatheringByteChannel)) {
066                            throw new IllegalArgumentException(
067                                    "Channel is not of type GatheringByteChannel");
068                    }
069    
070                    if (!(channel instanceof ScatteringByteChannel)) {
071                            throw new IllegalArgumentException(
072                                    "Channel is not of type ScatteringByteChannel");
073                    }
074    
075                    if (channel instanceof SelectableChannel) {
076                            SelectableChannel selectableChannel = (SelectableChannel)channel;
077    
078                            if (!selectableChannel.isBlocking()) {
079                                    throw new IllegalArgumentException(
080                                            "Channel is of type SelectableChannel and " +
081                                                    "configured in nonblocking mode");
082                            }
083                    }
084    
085                    ensureOpen();
086    
087                    return doRegisterChannel(
088                            (ScatteringByteChannel)channel, (GatheringByteChannel)channel);
089            }
090    
091            @Override
092            public RegistrationReference registerChannel(
093                    ScatteringByteChannel scatteringByteChannel,
094                    GatheringByteChannel gatheringByteChannel) {
095    
096                    if (gatheringByteChannel == null) {
097                            throw new NullPointerException("Gathering byte channel is null");
098                    }
099    
100                    if (scatteringByteChannel == null) {
101                            throw new NullPointerException("Scattering byte channel is null");
102                    }
103    
104                    if (scatteringByteChannel instanceof SelectableChannel) {
105                            SelectableChannel selectableChannel =
106                                    (SelectableChannel)scatteringByteChannel;
107    
108                            if (!selectableChannel.isBlocking()) {
109                                    throw new IllegalArgumentException(
110                                            "Scattering byte channel is of type SelectableChannel " +
111                                                    "and configured in nonblocking mode");
112                            }
113                    }
114    
115                    if (gatheringByteChannel instanceof SelectableChannel) {
116                            SelectableChannel selectableChannel =
117                                    (SelectableChannel)gatheringByteChannel;
118    
119                            if (!selectableChannel.isBlocking()) {
120                                    throw new IllegalArgumentException(
121                                            "Gathering byte channel is of type SelectableChannel and " +
122                                                    "configured in nonblocking mode");
123                            }
124                    }
125    
126                    ensureOpen();
127    
128                    return doRegisterChannel(scatteringByteChannel, gatheringByteChannel);
129            }
130    
131            protected RegistrationReference doRegisterChannel(
132                    ScatteringByteChannel scatteringByteChannel,
133                    GatheringByteChannel gatheringByteChannel) {
134    
135                    BlockingQueue<Datagram> sendingQueue =
136                            new LinkedBlockingQueue<Datagram>();
137    
138                    ChannelContext channelContext = new ChannelContext(sendingQueue);
139    
140                    ReadingCallable readingCallable = new ReadingCallable(
141                            scatteringByteChannel, channelContext);
142                    WritingCallable writingCallable = new WritingCallable(
143                            gatheringByteChannel, channelContext);
144    
145                    // Submit the polling jobs, no dispatch will happen until latches are
146                    // open. This ensures a thread safe publication of
147                    // ChannelContext#_registrationReference.
148    
149                    Future<Void> readFuture = executorService.submit(readingCallable);
150                    Future<Void> writeFuture = executorService.submit(writingCallable);
151    
152                    FutureRegistrationReference futureRegistrationReference =
153                            new FutureRegistrationReference(
154                                    this, channelContext, readFuture, writeFuture);
155    
156                    channelContext.setRegistrationReference(futureRegistrationReference);
157    
158                    readingCallable.openLatch();
159                    writingCallable.openLatch();
160    
161                    return futureRegistrationReference;
162            }
163    
164            @Override
165            protected void doSendDatagram(
166                    RegistrationReference registrationReference, Datagram datagram) {
167    
168                    FutureRegistrationReference futureRegistrationReference =
169                            (FutureRegistrationReference)registrationReference;
170    
171                    ChannelContext channelContext =
172                            futureRegistrationReference.channelContext;
173    
174                    Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
175    
176                    sendingQueue.offer(datagram);
177            }
178    
179            protected static final ThreadFactory THREAD_FACTORY =
180                    new NamedThreadFactory(
181                            ExecutorIntraband.class + ".threadFactory", Thread.NORM_PRIORITY,
182                            ExecutorIntraband.class.getClassLoader());
183    
184            protected final ExecutorService executorService =
185                    Executors.newCachedThreadPool(THREAD_FACTORY);
186    
187            protected class ReadingCallable implements Callable<Void> {
188    
189                    public ReadingCallable(
190                            ScatteringByteChannel scatteringByteChannel,
191                            ChannelContext channelContext) {
192    
193                            _scatteringByteChannel = scatteringByteChannel;
194                            _channelContext = channelContext;
195    
196                            _countDownLatch = new CountDownLatch(1);
197                    }
198    
199                    @Override
200                    public Void call() throws Exception {
201                            _countDownLatch.await();
202    
203                            while (_scatteringByteChannel.isOpen()) {
204                                    handleReading(_scatteringByteChannel, _channelContext);
205                            }
206    
207                            return null;
208                    }
209    
210                    public void openLatch() {
211                            _countDownLatch.countDown();
212                    }
213    
214                    private final ChannelContext _channelContext;
215                    private final CountDownLatch _countDownLatch;
216                    private final ScatteringByteChannel _scatteringByteChannel;
217    
218            }
219    
220            protected class WritingCallable implements Callable<Void> {
221    
222                    public WritingCallable(
223                            GatheringByteChannel gatheringByteChannel,
224                            ChannelContext channelContext) {
225    
226                            _gatheringByteChannel = gatheringByteChannel;
227                            _channelContext = channelContext;
228    
229                            _countDownLatch = new CountDownLatch(1);
230                    }
231    
232                    @Override
233                    public Void call() throws Exception {
234                            _countDownLatch.await();
235    
236                            try {
237                                    BlockingQueue<Datagram> sendingQueue =
238                                            (BlockingQueue<Datagram>)_channelContext.getSendingQueue();
239    
240                                    while (true) {
241                                            Datagram datagram = sendingQueue.take();
242    
243                                            _channelContext.setWritingDatagram(datagram);
244    
245                                            if (!handleWriting(
246                                                            _gatheringByteChannel, _channelContext)) {
247    
248                                                    if (_gatheringByteChannel.isOpen()) {
249    
250                                                            // Still open but no longer writable, typical
251                                                            // behavior of nonblocking channel
252    
253                                                            throw new IllegalStateException(
254                                                                    _gatheringByteChannel +
255                                                                            " behaved in nonblocking way.");
256                                                    }
257                                                    else {
258                                                            break;
259                                                    }
260                                            }
261    
262                                            cleanUpTimeoutResponseWaitingDatagrams();
263                                    }
264                            }
265                            catch (InterruptedException ie) {
266                            }
267    
268                            return null;
269                    }
270    
271                    public void openLatch() {
272                            _countDownLatch.countDown();
273                    }
274    
275                    private final ChannelContext _channelContext;
276                    private final CountDownLatch _countDownLatch;
277                    private final GatheringByteChannel _gatheringByteChannel;
278    
279            }
280    
281    }