001
014
015 package com.liferay.portal.kernel.nio.intraband.mailbox;
016
017 import com.liferay.portal.kernel.nio.intraband.Datagram;
018 import com.liferay.portal.kernel.nio.intraband.Intraband;
019 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020 import com.liferay.portal.kernel.nio.intraband.SystemDataType;
021 import com.liferay.portal.kernel.util.GetterUtil;
022 import com.liferay.portal.kernel.util.PropsKeys;
023 import com.liferay.portal.kernel.util.PropsUtil;
024
025 import java.nio.ByteBuffer;
026
027 import java.util.Map;
028 import java.util.concurrent.BlockingQueue;
029 import java.util.concurrent.ConcurrentHashMap;
030 import java.util.concurrent.DelayQueue;
031 import java.util.concurrent.Delayed;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicLong;
034
035
038 public class MailboxUtil {
039
040 public static ByteBuffer receiveMail(long receipt) {
041 ByteBuffer byteBuffer = _mailMap.remove(receipt);
042
043 _overdueMailQueue.remove(new ReceiptStub(receipt));
044
045 if (!_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
046 _pollingCleanup();
047 }
048
049 return byteBuffer;
050 }
051
052 public static long sendMail(
053 RegistrationReference registrationReference, ByteBuffer byteBuffer)
054 throws MailboxException {
055
056 Intraband intraband = registrationReference.getIntraband();
057
058 try {
059 SystemDataType systemDataType = SystemDataType.MAILBOX;
060
061 Datagram responseDatagram = intraband.sendSyncDatagram(
062 registrationReference,
063 Datagram.createRequestDatagram(
064 systemDataType.getValue(), byteBuffer));
065
066 byteBuffer = responseDatagram.getDataByteBuffer();
067
068 return byteBuffer.getLong();
069 }
070 catch (Exception e) {
071 throw new MailboxException(e);
072 }
073 }
074
075 static long depositMail(ByteBuffer byteBuffer) {
076 long receipt = _receiptGenerator.getAndIncrement();
077
078 _mailMap.put(receipt, byteBuffer);
079
080 _overdueMailQueue.offer(new ReceiptStub(receipt, System.nanoTime()));
081
082 if (!_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
083 _pollingCleanup();
084 }
085
086 return receipt;
087 }
088
089 private static void _pollingCleanup() {
090 ReceiptStub receiptStub = null;
091
092 while ((receiptStub = _overdueMailQueue.poll()) != null) {
093 _mailMap.remove(receiptStub.getReceipt());
094 }
095 }
096
097 private static final boolean _INTRABAND_MAILBOX_REAPER_THREAD_ENABLED =
098 GetterUtil.getBoolean(
099 PropsUtil.get(PropsKeys.INTRABAND_MAILBOX_REAPER_THREAD_ENABLED));
100
101 private static final long _INTRABAND_MAILBOX_STORAGE_LIFE =
102 GetterUtil.getLong(
103 PropsUtil.get(PropsKeys.INTRABAND_MAILBOX_STORAGE_LIFE));
104
105 private final static Map<Long, ByteBuffer> _mailMap =
106 new ConcurrentHashMap<Long, ByteBuffer>();
107 private final static BlockingQueue<ReceiptStub> _overdueMailQueue =
108 new DelayQueue<ReceiptStub>();
109 private final static AtomicLong _receiptGenerator = new AtomicLong();
110
111 private static class OverdueMailReaperThread extends Thread {
112
113 public OverdueMailReaperThread(String name) {
114 super(name);
115 }
116
117 @Override
118 public void run() {
119 while (true) {
120 try {
121 ReceiptStub receiptStub = _overdueMailQueue.take();
122
123 _mailMap.remove(receiptStub.getReceipt());
124 }
125 catch (InterruptedException ie) {
126 }
127 }
128 }
129 }
130
131 private static class ReceiptStub implements Delayed {
132
133 public ReceiptStub(long receipt) {
134 this(receipt, -1);
135 }
136
137 public ReceiptStub(long receipt, long currentNanoTime) {
138 _expireTime = currentNanoTime + TimeUnit.MILLISECONDS.toNanos(
139 _INTRABAND_MAILBOX_STORAGE_LIFE);
140 _receipt = receipt;
141 }
142
143 @Override
144 public int compareTo(Delayed delayed) {
145 ReceiptStub receiptStub = (ReceiptStub)delayed;
146
147 return (int)(_expireTime - receiptStub._expireTime);
148 }
149
150 @Override
151 public boolean equals(Object obj) {
152 ReceiptStub receiptStub = (ReceiptStub)obj;
153
154 return _receipt == receiptStub._receipt;
155 }
156
157 public long getReceipt() {
158 return _receipt;
159 }
160
161 @Override
162 public long getDelay(TimeUnit unit) {
163 return _expireTime - System.nanoTime();
164 }
165
166 private final long _expireTime;
167 private final long _receipt;
168
169 }
170
171 static {
172 if (_INTRABAND_MAILBOX_REAPER_THREAD_ENABLED) {
173 Thread thread = new OverdueMailReaperThread(
174 MailboxUtil.class.getName());
175
176 thread.setContextClassLoader(MailboxUtil.class.getClassLoader());
177 thread.setDaemon(true);
178
179 thread.start();
180 }
181 }
182
183 }