001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.nonblocking;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.BaseIntraband;
020    import com.liferay.portal.kernel.nio.intraband.ChannelContext;
021    import com.liferay.portal.kernel.nio.intraband.Datagram;
022    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
023    import com.liferay.portal.kernel.util.NamedThreadFactory;
024    
025    import java.io.IOException;
026    
027    import java.nio.channels.CancelledKeyException;
028    import java.nio.channels.Channel;
029    import java.nio.channels.ClosedSelectorException;
030    import java.nio.channels.GatheringByteChannel;
031    import java.nio.channels.ScatteringByteChannel;
032    import java.nio.channels.SelectableChannel;
033    import java.nio.channels.SelectionKey;
034    import java.nio.channels.Selector;
035    
036    import java.util.Iterator;
037    import java.util.Queue;
038    import java.util.Set;
039    import java.util.concurrent.Callable;
040    import java.util.concurrent.ConcurrentLinkedQueue;
041    import java.util.concurrent.FutureTask;
042    import java.util.concurrent.ThreadFactory;
043    
044    /**
045     * @author Shuyang Zhou
046     */
047    public class SelectorIntraband extends BaseIntraband {
048    
049            public SelectorIntraband(long defaultTimeout) throws IOException {
050                    super(defaultTimeout);
051    
052                    pollingThread.start();
053            }
054    
055            @Override
056            public void close() throws InterruptedException, IOException {
057                    selector.close();
058    
059                    pollingThread.interrupt();
060    
061                    pollingThread.join(defaultTimeout);
062    
063                    super.close();
064            }
065    
066            @Override
067            public RegistrationReference registerChannel(Channel channel)
068                    throws IOException {
069    
070                    if (channel == null) {
071                            throw new NullPointerException("Channel is null");
072                    }
073    
074                    if (!(channel instanceof GatheringByteChannel)) {
075                            throw new IllegalArgumentException(
076                                    "Channel is not of type GatheringByteChannel");
077                    }
078    
079                    if (!(channel instanceof ScatteringByteChannel)) {
080                            throw new IllegalArgumentException(
081                                    "Channel is not of type ScatteringByteChannel");
082                    }
083    
084                    if (!(channel instanceof SelectableChannel)) {
085                            throw new IllegalArgumentException(
086                                    "Channel is not of type SelectableChannel");
087                    }
088    
089                    SelectableChannel selectableChannel = (SelectableChannel)channel;
090    
091                    if ((selectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
092                            throw new IllegalArgumentException(
093                                    "Channel is not valid for reading");
094                    }
095    
096                    if ((selectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
097                            throw new IllegalArgumentException(
098                                    "Channel is not valid for writing");
099                    }
100    
101                    ensureOpen();
102    
103                    selectableChannel.configureBlocking(false);
104    
105                    FutureTask<RegistrationReference> registerFutureTask =
106                            new FutureTask<RegistrationReference>(
107                                    new RegisterCallable(selectableChannel, selectableChannel));
108    
109                    registerQueue.offer(registerFutureTask);
110    
111                    selector.wakeup();
112    
113                    try {
114                            return registerFutureTask.get();
115                    }
116                    catch (Exception e) {
117                            throw new IOException(e);
118                    }
119            }
120    
121            @Override
122            public RegistrationReference registerChannel(
123                            ScatteringByteChannel scatteringByteChannel,
124                            GatheringByteChannel gatheringByteChannel)
125                    throws IOException {
126    
127                    if (scatteringByteChannel == null) {
128                            throw new NullPointerException("Scattering byte channel is null");
129                    }
130    
131                    if (gatheringByteChannel == null) {
132                            throw new NullPointerException("Gathering byte channel is null");
133                    }
134    
135                    if (!(scatteringByteChannel instanceof SelectableChannel)) {
136                            throw new IllegalArgumentException(
137                                    "Scattering byte channel is not of type SelectableChannel");
138                    }
139    
140                    if (!(gatheringByteChannel instanceof SelectableChannel)) {
141                            throw new IllegalArgumentException(
142                                    "Gathering byte channel is not of type SelectableChannel");
143                    }
144    
145                    SelectableChannel readSelectableChannel =
146                            (SelectableChannel)scatteringByteChannel;
147                    SelectableChannel writeSelectableChannel =
148                            (SelectableChannel)gatheringByteChannel;
149    
150                    if ((readSelectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
151                            throw new IllegalArgumentException(
152                                    "Scattering byte channel is not valid for reading");
153                    }
154    
155                    if ((writeSelectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
156                            throw new IllegalArgumentException(
157                                    "Gathering byte channel is not valid for writing");
158                    }
159    
160                    ensureOpen();
161    
162                    readSelectableChannel.configureBlocking(false);
163                    writeSelectableChannel.configureBlocking(false);
164    
165                    FutureTask<RegistrationReference> registerFutureTask =
166                            new FutureTask<RegistrationReference>(
167                                    new RegisterCallable(
168                                            readSelectableChannel, writeSelectableChannel));
169    
170                    registerQueue.offer(registerFutureTask);
171    
172                    selector.wakeup();
173    
174                    try {
175                            return registerFutureTask.get();
176                    }
177                    catch (Exception e) {
178                            throw new IOException(e);
179                    }
180            }
181    
182            @Override
183            protected void doSendDatagram(
184                    RegistrationReference registrationReference, Datagram datagram) {
185    
186                    SelectionKeyRegistrationReference selectionKeyRegistrationReference =
187                            (SelectionKeyRegistrationReference)registrationReference;
188    
189                    SelectionKey writeSelectionKey =
190                            selectionKeyRegistrationReference.writeSelectionKey;
191    
192                    ChannelContext channelContext =
193                            (ChannelContext)writeSelectionKey.attachment();
194    
195                    Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
196    
197                    sendingQueue.offer(datagram);
198    
199                    synchronized (writeSelectionKey) {
200                            int ops = writeSelectionKey.interestOps();
201    
202                            if ((ops & SelectionKey.OP_WRITE) == 0) {
203                                    ops |= SelectionKey.OP_WRITE;
204    
205                                    writeSelectionKey.interestOps(ops);
206    
207                                    selector.wakeup();
208                            }
209                    }
210            }
211    
212            protected void registerChannels() {
213                    FutureTask<RegistrationReference> registerFuturetask = null;
214    
215                    while ((registerFuturetask = registerQueue.poll()) != null) {
216                            registerFuturetask.run();
217                    }
218            }
219    
220            protected static final ThreadFactory threadFactory =
221                    new NamedThreadFactory(
222                            SelectorIntraband.class + ".threadFactory", Thread.NORM_PRIORITY,
223                            SelectorIntraband.class.getClassLoader());
224    
225            protected final Thread pollingThread = threadFactory.newThread(
226                    new PollingJob());
227            protected final Queue<FutureTask<RegistrationReference>> registerQueue =
228                    new ConcurrentLinkedQueue<FutureTask<RegistrationReference>>();
229            protected final Selector selector = Selector.open();
230    
231            protected class RegisterCallable
232                    implements Callable<RegistrationReference> {
233    
234                    public RegisterCallable(
235                            SelectableChannel readSelectableChannel,
236                            SelectableChannel writeSelectableChannel) {
237    
238                            _readSelectableChannel = readSelectableChannel;
239                            _writeSelectableChannel = writeSelectableChannel;
240                    }
241    
242                    @Override
243                    public RegistrationReference call() throws Exception {
244                            if (_readSelectableChannel == _writeSelectableChannel) {
245    
246                                    // Register channel with zero interest, no dispatch will happen
247                                    // before channel context is ready. This ensures thread safe
248                                    // publication for ChannelContext#_registrationReference.
249    
250                                    SelectionKey selectionKey = _readSelectableChannel.register(
251                                            selector, 0);
252    
253                                    SelectionKeyRegistrationReference
254                                            selectionKeyRegistrationReference =
255                                                    new SelectionKeyRegistrationReference(
256                                                            SelectorIntraband.this, selectionKey, selectionKey);
257    
258                                    ChannelContext channelContext = new ChannelContext(
259                                            new ConcurrentLinkedQueue<Datagram>());
260    
261                                    channelContext.setRegistrationReference(
262                                            selectionKeyRegistrationReference);
263    
264                                    selectionKey.attach(channelContext);
265    
266                                    // Alter interest ops after preparing the channel context
267    
268                                    selectionKey.interestOps(SelectionKey.OP_READ);
269    
270                                    return selectionKeyRegistrationReference;
271                            }
272                            else {
273    
274                                    // Register channels with zero interest, no dispatch will happen
275                                    // before channel contexts are ready. This ensures thread safe
276                                    // publication for ChannelContext#_registrationReference.
277    
278                                    SelectionKey readSelectionKey = _readSelectableChannel.register(
279                                            selector, 0);
280    
281                                    SelectionKey writeSelectionKey =
282                                            _writeSelectableChannel.register(selector, 0);
283    
284                                    SelectionKeyRegistrationReference
285                                            selectionKeyRegistrationReference =
286                                                    new SelectionKeyRegistrationReference(
287                                                            SelectorIntraband.this, readSelectionKey,
288                                                            writeSelectionKey);
289    
290                                    ChannelContext channelContext = new ChannelContext(
291                                            new ConcurrentLinkedQueue<Datagram>());
292    
293                                    channelContext.setRegistrationReference(
294                                            selectionKeyRegistrationReference);
295    
296                                    readSelectionKey.attach(channelContext);
297                                    writeSelectionKey.attach(channelContext);
298    
299                                    // Alter interest ops after ChannelContexts preparation
300    
301                                    readSelectionKey.interestOps(SelectionKey.OP_READ);
302    
303                                    return selectionKeyRegistrationReference;
304                            }
305                    }
306    
307                    private final SelectableChannel _readSelectableChannel;
308                    private final SelectableChannel _writeSelectableChannel;
309    
310            }
311    
312            private void _processReading(SelectionKey selectionKey) {
313                    ScatteringByteChannel scatteringByteChannel =
314                            (ScatteringByteChannel)selectionKey.channel();
315    
316                    ChannelContext channelContext =
317                            (ChannelContext)selectionKey.attachment();
318    
319                    handleReading(scatteringByteChannel, channelContext);
320            }
321    
322            private void _processWriting(SelectionKey selectionKey) {
323                    GatheringByteChannel gatheringByteChannel =
324                            (GatheringByteChannel)selectionKey.channel();
325    
326                    ChannelContext channelContext =
327                            (ChannelContext)selectionKey.attachment();
328    
329                    Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
330    
331                    if (channelContext.getWritingDatagram() == null) {
332                            channelContext.setWritingDatagram(sendingQueue.poll());
333                    }
334    
335                    boolean backOff = false;
336    
337                    if (channelContext.getWritingDatagram() != null) {
338                            if (handleWriting(gatheringByteChannel, channelContext)) {
339                                    if (sendingQueue.isEmpty()) {
340                                            backOff = true;
341                                    }
342                            }
343                    }
344                    else {
345                            backOff = true;
346                    }
347    
348                    if (backOff) {
349    
350                            // Channel is still writable, but there is nothing to send, back off
351                            // to prevent unnecessary busy spinning.
352    
353                            int ops = selectionKey.interestOps();
354    
355                            ops &= ~SelectionKey.OP_WRITE;
356    
357                            synchronized (selectionKey) {
358                                    if (sendingQueue.isEmpty()) {
359                                            selectionKey.interestOps(ops);
360                                    }
361                            }
362                    }
363            }
364    
365            private static Log _log = LogFactoryUtil.getLog(SelectorIntraband.class);
366    
367            private class PollingJob implements Runnable {
368    
369                    @Override
370                    public void run() {
371                            try {
372                                    try {
373                                            while (true) {
374                                                    int readyCount = selector.select();
375    
376                                                    if (readyCount > 0) {
377                                                            Set<SelectionKey> selectionKeys =
378                                                                    selector.selectedKeys();
379    
380                                                            Iterator<SelectionKey> iterator =
381                                                                    selectionKeys.iterator();
382    
383                                                            while (iterator.hasNext()) {
384                                                                    SelectionKey selectionKey = iterator.next();
385    
386                                                                    iterator.remove();
387    
388                                                                    try {
389                                                                            if (selectionKey.isReadable()) {
390                                                                                    _processReading(selectionKey);
391                                                                            }
392    
393                                                                            if (selectionKey.isWritable()) {
394                                                                                    _processWriting(selectionKey);
395                                                                            }
396                                                                    }
397                                                                    catch (CancelledKeyException cke) {
398    
399                                                                            // Concurrent cancelling, move to next key
400    
401                                                                    }
402                                                            }
403                                                    }
404                                                    else if (!selector.isOpen()) {
405                                                            break;
406                                                    }
407    
408                                                    registerChannels();
409                                                    cleanUpTimeoutResponseWaitingDatagrams();
410                                            }
411                                    }
412                                    finally {
413                                            selector.close();
414                                    }
415                            }
416                            catch (ClosedSelectorException cse) {
417                                    if (_log.isInfoEnabled()) {
418                                            Thread currentThread = Thread.currentThread();
419    
420                                            _log.info(
421                                                    currentThread.getName() +
422                                                            " exiting gracefully on selector closure");
423                                    }
424                            }
425                            catch (Throwable t) {
426                                    Thread currentThread = Thread.currentThread();
427    
428                                    _log.error(
429                                            currentThread.getName() + " exiting exceptionally", t);
430                            }
431    
432                            // Flush out pending register requests to unblock their invokers,
433                            // this will cause them to receive a ClosedSelectorException
434    
435                            registerChannels();
436    
437                            responseWaitingMap.clear();
438                            timeoutMap.clear();
439                    }
440    
441            }
442    
443    }