001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.mailbox;
016    
017    import com.liferay.portal.kernel.nio.intraband.Datagram;
018    import com.liferay.portal.kernel.nio.intraband.Intraband;
019    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020    import com.liferay.portal.kernel.nio.intraband.SystemDataType;
021    import com.liferay.portal.kernel.util.GetterUtil;
022    import com.liferay.portal.kernel.util.PropsKeys;
023    import com.liferay.portal.kernel.util.PropsUtil;
024    
025    import java.nio.ByteBuffer;
026    
027    import java.util.Map;
028    import java.util.concurrent.BlockingQueue;
029    import java.util.concurrent.ConcurrentHashMap;
030    import java.util.concurrent.DelayQueue;
031    import java.util.concurrent.Delayed;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicLong;
034    
035    /**
036     * @author Shuyang Zhou
037     */
038    public class MailboxUtil {
039    
040            public static ByteBuffer receiveMail(long receipt) {
041                    ByteBuffer byteBuffer = _mailMap.remove(receipt);
042    
043                    _overdueMailQueue.remove(new ReceiptStub(receipt));
044    
045                    if (!_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
046                            _pollingCleanup();
047                    }
048    
049                    return byteBuffer;
050            }
051    
052            public static long sendMail(
053                            RegistrationReference registrationReference, ByteBuffer byteBuffer)
054                    throws MailboxException {
055    
056                    Intraband intraband = registrationReference.getIntraband();
057    
058                    try {
059                            SystemDataType systemDataType = SystemDataType.MAILBOX;
060    
061                            Datagram responseDatagram = intraband.sendSyncDatagram(
062                                    registrationReference,
063                                    Datagram.createRequestDatagram(
064                                            systemDataType.getValue(), byteBuffer));
065    
066                            byteBuffer = responseDatagram.getDataByteBuffer();
067    
068                            return byteBuffer.getLong();
069                    }
070                    catch (Exception e) {
071                            throw new MailboxException(e);
072                    }
073            }
074    
075            static long depositMail(ByteBuffer byteBuffer) {
076                    long receipt = _receiptGenerator.getAndIncrement();
077    
078                    _mailMap.put(receipt, byteBuffer);
079    
080                    _overdueMailQueue.offer(new ReceiptStub(receipt, System.nanoTime()));
081    
082                    if (!_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
083                            _pollingCleanup();
084                    }
085    
086                    return receipt;
087            }
088    
089            private static void _pollingCleanup() {
090                    ReceiptStub receiptStub = null;
091    
092                    while ((receiptStub = _overdueMailQueue.poll()) != null) {
093                            _mailMap.remove(receiptStub.getReceipt());
094                    }
095            }
096    
097            private static final boolean _INTRABAND_MAILBOX_REAPER_THREAD_ENABLED =
098                    GetterUtil.getBoolean(
099                            PropsUtil.get(PropsKeys.INTRABAND_MAILBOX_REAPER_THREAD_ENABLED));
100    
101            private static final long _INTRABAND_MAILBOX_STORAGE_LIFE =
102                    GetterUtil.getLong(
103                            PropsUtil.get(PropsKeys.INTRABAND_MAILBOX_STORAGE_LIFE));
104    
105            private final static Map<Long, ByteBuffer> _mailMap =
106                    new ConcurrentHashMap<Long, ByteBuffer>();
107            private final static BlockingQueue<ReceiptStub> _overdueMailQueue =
108                    new DelayQueue<ReceiptStub>();
109            private final static AtomicLong _receiptGenerator = new AtomicLong();
110    
111            private static class OverdueMailReaperThread extends Thread {
112    
113                    public OverdueMailReaperThread(String name) {
114                            super(name);
115                    }
116    
117                    @Override
118                    public void run() {
119                            while (true) {
120                                    try {
121                                            ReceiptStub receiptStub = _overdueMailQueue.take();
122    
123                                            _mailMap.remove(receiptStub.getReceipt());
124                                    }
125                                    catch (InterruptedException ie) {
126                                    }
127                            }
128                    }
129            }
130    
131            private static class ReceiptStub implements Delayed {
132    
133                    public ReceiptStub(long receipt) {
134                            this(receipt, -1);
135                    }
136    
137                    public ReceiptStub(long receipt, long currentNanoTime) {
138                            _expireTime = currentNanoTime + TimeUnit.MILLISECONDS.toNanos(
139                                    _INTRABAND_MAILBOX_STORAGE_LIFE);
140                            _receipt = receipt;
141                    }
142    
143                    @Override
144                    public int compareTo(Delayed delayed) {
145                            ReceiptStub receiptStub = (ReceiptStub)delayed;
146    
147                            return (int)(_expireTime - receiptStub._expireTime);
148                    }
149    
150                    @Override
151                    public boolean equals(Object obj) {
152                            ReceiptStub receiptStub = (ReceiptStub)obj;
153    
154                            return _receipt == receiptStub._receipt;
155                    }
156    
157                    public long getReceipt() {
158                            return _receipt;
159                    }
160    
161                    @Override
162                    public long getDelay(TimeUnit unit) {
163                            return _expireTime - System.nanoTime();
164                    }
165    
166                    private final long _expireTime;
167                    private final long _receipt;
168    
169            }
170    
171            static {
172                    if (_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
173                            Thread thread = new OverdueMailReaperThread(
174                                    MailboxUtil.class.getName());
175    
176                            thread.setContextClassLoader(MailboxUtil.class.getClassLoader());
177                            thread.setDaemon(true);
178    
179                            thread.start();
180                    }
181            }
182    
183    }