001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
020    
021    import java.io.IOException;
022    
023    import java.nio.channels.GatheringByteChannel;
024    import java.nio.channels.ScatteringByteChannel;
025    
026    import java.util.EnumSet;
027    import java.util.Iterator;
028    import java.util.Map;
029    import java.util.NavigableMap;
030    import java.util.Set;
031    import java.util.concurrent.ConcurrentHashMap;
032    import java.util.concurrent.ConcurrentSkipListMap;
033    import java.util.concurrent.CountDownLatch;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.TimeoutException;
036    import java.util.concurrent.atomic.AtomicLong;
037    import java.util.concurrent.atomic.AtomicReference;
038    
039    /**
040     * @author Shuyang Zhou
041     */
042    public abstract class BaseIntraband implements Intraband {
043    
044            public BaseIntraband(long defaultTimeout) {
045                    this.defaultTimeout = defaultTimeout;
046            }
047    
048            @Override
049            @SuppressWarnings("unused")
050            public void close() throws InterruptedException, IOException {
051                    datagramReceiveHandlersReference.set(null);
052    
053                    open = false;
054            }
055    
056            @Override
057            public DatagramReceiveHandler[] getDatagramReceiveHandlers() {
058                    ensureOpen();
059    
060                    DatagramReceiveHandler[] datagramReceiveHandlers =
061                            datagramReceiveHandlersReference.get();
062    
063                    return datagramReceiveHandlers.clone();
064            }
065    
066            @Override
067            public boolean isOpen() {
068                    return open;
069            }
070    
071            @Override
072            public DatagramReceiveHandler registerDatagramReceiveHandler(
073                    byte type, DatagramReceiveHandler datagramReceiveHandler) {
074    
075                    ensureOpen();
076    
077                    int index = type & 0xFF;
078    
079                    DatagramReceiveHandler oldDatagramReceiveHandler = null;
080                    DatagramReceiveHandler[] datagramReceiveHandlers = null;
081                    DatagramReceiveHandler[] copyDatagramReceiveHandlers = null;
082    
083                    do {
084                            datagramReceiveHandlers = datagramReceiveHandlersReference.get();
085    
086                            copyDatagramReceiveHandlers = datagramReceiveHandlers.clone();
087    
088                            oldDatagramReceiveHandler = copyDatagramReceiveHandlers[index];
089    
090                            copyDatagramReceiveHandlers[index] = datagramReceiveHandler;
091                    }
092                    while (
093                            !datagramReceiveHandlersReference.compareAndSet(
094                                    datagramReceiveHandlers, copyDatagramReceiveHandlers));
095    
096                    return oldDatagramReceiveHandler;
097            }
098    
099            @Override
100            public void sendDatagram(
101                    RegistrationReference registrationReference, Datagram datagram) {
102    
103                    if (registrationReference == null) {
104                            throw new NullPointerException("Registration reference is null");
105                    }
106    
107                    if (!registrationReference.isValid()) {
108                            throw new IllegalArgumentException(
109                                    "Registration reference is invalid");
110                    }
111    
112                    if (datagram == null) {
113                            throw new NullPointerException("Datagram is null");
114                    }
115    
116                    ensureOpen();
117    
118                    doSendDatagram(registrationReference, datagram);
119            }
120    
121            @Override
122            public <A> void sendDatagram(
123                    RegistrationReference registrationReference, Datagram datagram,
124                    A attachment, EnumSet<CompletionHandler.CompletionType> completionTypes,
125                    CompletionHandler<A> completionHandler) {
126    
127                    sendDatagram(
128                            registrationReference, datagram, attachment, completionTypes,
129                            completionHandler, defaultTimeout, TimeUnit.MILLISECONDS);
130            }
131    
132            @Override
133            public <A> void sendDatagram(
134                    RegistrationReference registrationReference, Datagram datagram,
135                    A attachment, EnumSet<CompletionType> completionTypes,
136                    CompletionHandler<A> completionHandler, long timeout,
137                    TimeUnit timeUnit) {
138    
139                    if (registrationReference == null) {
140                            throw new NullPointerException("Registration reference is null");
141                    }
142    
143                    if (!registrationReference.isValid()) {
144                            throw new IllegalArgumentException(
145                                    "Registration reference is invalid");
146                    }
147    
148                    if (datagram == null) {
149                            throw new NullPointerException("Datagram is null");
150                    }
151    
152                    if (completionTypes == null) {
153                            throw new NullPointerException("Completion type set is null");
154                    }
155    
156                    if (completionTypes.isEmpty()) {
157                            throw new IllegalArgumentException("Completion type set is empty");
158                    }
159    
160                    if (completionHandler == null) {
161                            throw new NullPointerException("Complete handler is null");
162                    }
163    
164                    if (timeUnit == null) {
165                            throw new NullPointerException("Time unit is null");
166                    }
167    
168                    if (timeout <= 0) {
169                            timeout = defaultTimeout;
170                    }
171                    else {
172                            timeout = timeUnit.toMillis(timeout);
173                    }
174    
175                    ensureOpen();
176    
177                    datagram.attachment = attachment;
178                    datagram.completionHandler =
179                            (CompletionHandler<Object>)completionHandler;
180                    datagram.completionTypes = completionTypes;
181                    datagram.timeout = timeout;
182    
183                    datagram.setAckRequest(
184                            completionTypes.contains(CompletionType.DELIVERED));
185    
186                    if (datagram.getSequenceId() == 0) {
187                            datagram.setSequenceId(generateSequenceId());
188                    }
189    
190                    if (completionTypes.contains(CompletionType.DELIVERED) ||
191                            completionTypes.contains(CompletionType.REPLIED)) {
192    
193                            addResponseWaitingDatagram(datagram);
194                    }
195    
196                    doSendDatagram(registrationReference, datagram);
197            }
198    
199            @Override
200            public Datagram sendSyncDatagram(
201                            RegistrationReference registrationReference, Datagram datagram)
202                    throws InterruptedException, IOException, TimeoutException {
203    
204                    return sendSyncDatagram(
205                            registrationReference, datagram, defaultTimeout,
206                            TimeUnit.MILLISECONDS);
207            }
208    
209            @Override
210            public Datagram sendSyncDatagram(
211                            RegistrationReference registrationReference, Datagram datagram,
212                            long timeout, TimeUnit timeUnit)
213                    throws InterruptedException, IOException, TimeoutException {
214    
215                    if (registrationReference == null) {
216                            throw new NullPointerException("Registration reference is null");
217                    }
218    
219                    if (!registrationReference.isValid()) {
220                            throw new IllegalArgumentException(
221                                    "Registration reference is invalid");
222                    }
223    
224                    if (datagram == null) {
225                            throw new NullPointerException("Datagram is null");
226                    }
227    
228                    if (timeUnit == null) {
229                            throw new NullPointerException("Time unit is null");
230                    }
231    
232                    if (timeout <= 0) {
233                            timeout = defaultTimeout;
234                    }
235                    else {
236                            timeout = timeUnit.toMillis(timeout);
237                    }
238    
239                    ensureOpen();
240    
241                    return doSendSyncDatagram(registrationReference, datagram, timeout);
242            }
243    
244            @Override
245            public DatagramReceiveHandler unregisterDatagramReceiveHandler(byte type) {
246                    return registerDatagramReceiveHandler(type, null);
247            }
248    
249            protected void addResponseWaitingDatagram(Datagram requestDatagram) {
250                    long sequenceId = requestDatagram.getSequenceId();
251    
252                    long expireTime = System.currentTimeMillis() + requestDatagram.timeout;
253    
254                    requestDatagram.expireTime = expireTime;
255    
256                    responseWaitingMap.put(sequenceId, requestDatagram);
257    
258                    timeoutMap.put(expireTime, sequenceId);
259            }
260    
261            protected void cleanUpTimeoutResponseWaitingDatagrams() {
262                    Map<Long, Long> map = timeoutMap.headMap(
263                            System.currentTimeMillis(), true);
264    
265                    if (map.isEmpty()) {
266                            return;
267                    }
268    
269                    Set<Map.Entry<Long, Long>> set = map.entrySet();
270    
271                    Iterator<Map.Entry<Long, Long>> iterator = set.iterator();
272    
273                    while (iterator.hasNext()) {
274                            Map.Entry<Long, Long> entry = iterator.next();
275    
276                            iterator.remove();
277    
278                            Long sequenceId = entry.getValue();
279    
280                            Datagram datagram = responseWaitingMap.remove(sequenceId);
281    
282                            if (_log.isWarnEnabled()) {
283                                    _log.warn(
284                                            "Removed timeout response waiting datagram " + datagram);
285                            }
286    
287                            datagram.completionHandler.timedOut(datagram.attachment);
288                    }
289            }
290    
291            protected abstract void doSendDatagram(
292                    RegistrationReference registrationReference, Datagram datagram);
293    
294            protected Datagram doSendSyncDatagram(
295                            RegistrationReference registrationReference, Datagram datagram,
296                            long timeout)
297                    throws InterruptedException, IOException, TimeoutException {
298    
299                    SendSyncDatagramCompletionHandler sendSyncDatagramCompletionHandler =
300                            new SendSyncDatagramCompletionHandler();
301    
302                    datagram.completionHandler = sendSyncDatagramCompletionHandler;
303                    datagram.completionTypes = REPLIED_ENUM_SET;
304                    datagram.timeout = timeout;
305    
306                    if (datagram.getSequenceId() == 0) {
307                            datagram.setSequenceId(generateSequenceId());
308                    }
309    
310                    addResponseWaitingDatagram(datagram);
311    
312                    doSendDatagram(registrationReference, datagram);
313    
314                    return sendSyncDatagramCompletionHandler.waitResult(timeout);
315            }
316    
317            protected void ensureOpen() {
318                    if (!isOpen()) {
319                            throw new ClosedIntrabandException();
320                    }
321            }
322    
323            protected long generateSequenceId() {
324                    long sequenceId = sequenceIdGenerator.getAndIncrement();
325    
326                    if (sequenceId < 0) {
327    
328                            // We assume a long primitive type can hold enough numbers to keep a
329                            // large window time between the earliest and the latest response
330                            // waiting datagrams. In a real system, we will run out of memory
331                            // long before the latest response waiting datagram's ID can catch
332                            // up to the earliest response waiting datagram's ID to cause an ID
333                            // conflict. Even if the sequence ID generator was the only code to
334                            // use memory (which will never be true), to see an ID conflict, we
335                            // need to hold up 2^63 references. Even if we did not factor in the
336                            // data inside the datagram, and considered just the references
337                            // themselves, we would need 2^65 byte or 32 EB (exbibyte) of
338                            // memory, which is impossible in existing computer systems.
339    
340                            sequenceId += Long.MIN_VALUE;
341                    }
342    
343                    return sequenceId;
344            }
345    
346            protected void handleReading(
347                    ScatteringByteChannel scatteringByteChannel,
348                    ChannelContext channelContext) {
349    
350                    Datagram datagram = channelContext.getReadingDatagram();
351    
352                    if (datagram == null) {
353                            datagram = Datagram.createReceiveDatagram();
354    
355                            channelContext.setReadingDatagram(datagram);
356                    }
357    
358                    try {
359                            if (datagram.readFrom(scatteringByteChannel)) {
360                                    channelContext.setReadingDatagram(
361                                            Datagram.createReceiveDatagram());
362    
363                                    if (datagram.isAckResponse()) {
364                                            Datagram requestDatagram = removeResponseWaitingDatagram(
365                                                    datagram);
366    
367                                            if (requestDatagram == null) {
368                                                    if (_log.isWarnEnabled()) {
369                                                            _log.warn(
370                                                                    "Dropped ownerless ACK response " + datagram);
371                                                    }
372                                            }
373                                            else {
374                                                    CompletionHandler<Object> completionHandler =
375                                                            requestDatagram.completionHandler;
376    
377                                                    completionHandler.delivered(requestDatagram.attachment);
378                                            }
379                                    }
380                                    else if (datagram.isResponse()) {
381                                            Datagram requestDatagram = removeResponseWaitingDatagram(
382                                                    datagram);
383    
384                                            if (requestDatagram == null) {
385                                                    if (_log.isWarnEnabled()) {
386                                                            _log.warn("Dropped ownerless response " + datagram);
387                                                    }
388                                            }
389                                            else {
390                                                    EnumSet<CompletionType> completionTypes =
391                                                            requestDatagram.completionTypes;
392    
393                                                    if (completionTypes.contains(CompletionType.REPLIED)) {
394                                                            CompletionHandler<Object> completionHandler =
395                                                                    requestDatagram.completionHandler;
396    
397                                                            completionHandler.replied(
398                                                                    requestDatagram.attachment, datagram);
399                                                    }
400                                                    else if (_log.isWarnEnabled()) {
401                                                            _log.warn(
402                                                                    "Dropped unconcerned response " + datagram);
403                                                    }
404                                            }
405                                    }
406                                    else {
407                                            if (datagram.isAckRequest()) {
408                                                    Datagram ackResponseDatagram =
409                                                            Datagram.createACKResponseDatagram(
410                                                                    datagram.getSequenceId());
411    
412                                                    doSendDatagram(
413                                                            channelContext.getRegistrationReference(),
414                                                            ackResponseDatagram);
415                                            }
416    
417                                            int index = datagram.getType() & 0xFF;
418    
419                                            DatagramReceiveHandler datagramReceiveHandler =
420                                                    datagramReceiveHandlersReference.get()[index];
421    
422                                            if (datagramReceiveHandler == null) {
423                                                    if (_log.isWarnEnabled()) {
424                                                            _log.warn("Dropped ownerless request " + datagram);
425                                                    }
426                                            }
427                                            else {
428                                                    try {
429                                                            datagramReceiveHandler.receive(
430                                                                    channelContext.getRegistrationReference(),
431                                                                    datagram);
432                                                    }
433                                                    catch (Throwable t) {
434                                                            _log.error("Unable to dispatch", t);
435                                                    }
436                                            }
437                                    }
438                            }
439                    }
440                    catch (IOException ioe) {
441                            RegistrationReference registrationReference =
442                                    channelContext.getRegistrationReference();
443    
444                            registrationReference.cancelRegistration();
445    
446                            if (_log.isDebugEnabled()) {
447                                    _log.debug(
448                                            "Broken read channel, unregister " + registrationReference,
449                                            ioe);
450                            }
451                            else if (_log.isInfoEnabled()) {
452                                    _log.info(
453                                            "Broken read channel, unregister " + registrationReference);
454                            }
455                    }
456            }
457    
458            protected boolean handleWriting(
459                    GatheringByteChannel gatheringByteChannel,
460                    ChannelContext channelContext) {
461    
462                    Datagram datagram = channelContext.getWritingDatagram();
463    
464                    try {
465                            if (datagram.writeTo(gatheringByteChannel)) {
466                                    channelContext.setWritingDatagram(null);
467    
468                                    EnumSet<CompletionType> completionTypes =
469                                            datagram.completionTypes;
470    
471                                    if (completionTypes != null) {
472                                            if (completionTypes.contains(CompletionType.SUBMITTED)) {
473                                                    CompletionHandler<Object> completeHandler =
474                                                            datagram.completionHandler;
475    
476                                                    completeHandler.submitted(datagram.attachment);
477                                            }
478                                    }
479    
480                                    return true;
481                            }
482                            else {
483                                    return false;
484                            }
485                    }
486                    catch (IOException ioe) {
487                            RegistrationReference registrationReference =
488                                    channelContext.getRegistrationReference();
489    
490                            registrationReference.cancelRegistration();
491    
492                            CompletionHandler<Object> completionHandler =
493                                    datagram.completionHandler;
494    
495                            if (completionHandler != null) {
496                                    completionHandler.failed(datagram.attachment, ioe);
497                            }
498    
499                            if (_log.isDebugEnabled()) {
500                                    _log.debug(
501                                            "Broken write channel, unregister " + registrationReference,
502                                            ioe);
503                            }
504                            else if (_log.isInfoEnabled()) {
505                                    _log.info(
506                                            "Broken write channel, unregister " +
507                                                    registrationReference);
508                            }
509    
510                            return false;
511                    }
512            }
513    
514            protected Datagram removeResponseWaitingDatagram(
515                    Datagram responseDatagram) {
516    
517                    long sequenceId = responseDatagram.getSequenceId();
518    
519                    Datagram requestDatagram = responseWaitingMap.remove(sequenceId);
520    
521                    if (requestDatagram != null) {
522                            timeoutMap.remove(requestDatagram.expireTime);
523                    }
524    
525                    return requestDatagram;
526            }
527    
528            protected static final EnumSet<CompletionType> REPLIED_ENUM_SET =
529                    EnumSet.of(CompletionType.REPLIED);
530    
531            protected final long defaultTimeout;
532            protected final AtomicReference<DatagramReceiveHandler[]>
533                    datagramReceiveHandlersReference =
534                            new AtomicReference<DatagramReceiveHandler[]>(
535                                    new DatagramReceiveHandler[256]);
536            protected volatile boolean open = true;
537            protected final Map<Long, Datagram> responseWaitingMap =
538                    new ConcurrentHashMap<Long, Datagram>();
539            protected final AtomicLong sequenceIdGenerator = new AtomicLong();
540            protected final NavigableMap<Long, Long> timeoutMap =
541                    new ConcurrentSkipListMap<Long, Long>();
542    
543            protected static class SendSyncDatagramCompletionHandler
544                    implements CompletionHandler<Object> {
545    
546                    @Override
547                    public void delivered(Object attachment) {
548                    }
549    
550                    @Override
551                    public void failed(Object attachment, IOException ioe) {
552    
553                            // Must set before count down to ensure memory visibility
554    
555                            _ioe = ioe;
556    
557                            _countDownLatch.countDown();
558                    }
559    
560                    @Override
561                    public void replied(Object attachment, Datagram datagram) {
562    
563                            // Must set before count down to ensure memory visibility
564    
565                            _datagram = datagram;
566    
567                            _countDownLatch.countDown();
568                    }
569    
570                    @Override
571                    public void submitted(Object attachment) {
572                    }
573    
574                    @Override
575                    public void timedOut(Object attachment) {
576                    }
577    
578                    public Datagram waitResult(long timeout)
579                            throws InterruptedException, IOException, TimeoutException {
580    
581                            boolean result = _countDownLatch.await(
582                                    timeout, TimeUnit.MILLISECONDS);
583    
584                            if (!result) {
585                                    throw new TimeoutException("Result waiting timeout");
586                            }
587    
588                            if (_ioe != null) {
589                                    throw _ioe;
590                            }
591    
592                            return _datagram;
593                    }
594    
595                    private final CountDownLatch _countDownLatch = new CountDownLatch(1);
596                    private Datagram _datagram;
597                    private IOException _ioe;
598    
599            }
600    
601            private static Log _log = LogFactoryUtil.getLog(BaseIntraband.class);
602    
603    }