001
014
015 package com.liferay.portal.kernel.nio.intraband;
016
017 import com.liferay.portal.kernel.io.BigEndianCodec;
018 import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
019 import com.liferay.portal.kernel.util.StringBundler;
020 import com.liferay.portal.kernel.util.StringPool;
021
022 import java.io.EOFException;
023 import java.io.IOException;
024
025 import java.nio.ByteBuffer;
026 import java.nio.channels.GatheringByteChannel;
027 import java.nio.channels.ScatteringByteChannel;
028
029 import java.util.EnumSet;
030
031
068 public class Datagram {
069
070 public static Datagram createRequestDatagram(byte type, byte[] data) {
071 return createRequestDatagram(type, ByteBuffer.wrap(data));
072 }
073
074 public static Datagram createRequestDatagram(
075 byte type, ByteBuffer dataByteBuffer) {
076
077 Datagram datagram = new Datagram();
078
079
080
081 datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_REQUEST;
082
083
084
085
086
087 datagram._headerBufferArray[_INDEX_DATA_TYPE] = type;
088
089
090
091 BigEndianCodec.putInt(
092 datagram._headerBufferArray, _INDEX_DATA_SIZE,
093 dataByteBuffer.remaining());
094
095
096
097 datagram._dataByteBuffer = dataByteBuffer;
098
099 return datagram;
100 }
101
102 public static Datagram createResponseDatagram(
103 Datagram requestDatagram, byte[] data) {
104
105 return createResponseDatagram(requestDatagram, ByteBuffer.wrap(data));
106 }
107
108 public static Datagram createResponseDatagram(
109 Datagram requestDatagram, ByteBuffer byteBuffer) {
110
111 Datagram datagram = new Datagram();
112
113
114
115 datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_RESPONSE;
116
117
118
119 BigEndianCodec.putLong(
120 datagram._headerBufferArray, _INDEX_SEQUENCE_ID,
121 requestDatagram.getSequenceId());
122
123
124
125
126
127 BigEndianCodec.putInt(
128 datagram._headerBufferArray, _INDEX_DATA_SIZE,
129 byteBuffer.remaining());
130
131
132
133 datagram._dataByteBuffer = byteBuffer;
134
135 return datagram;
136 }
137
138 public ByteBuffer getDataByteBuffer() {
139 return _dataByteBuffer;
140 }
141
142 public byte getType() {
143 return _headerBufferArray[_INDEX_DATA_TYPE];
144 }
145
146 @Override
147 public String toString() {
148 StringBundler sb = new StringBundler(11);
149
150 sb.append("{dataChunk=");
151
152 ByteBuffer byteBuffer = _dataByteBuffer;
153
154 if (byteBuffer == null) {
155 sb.append(StringPool.NULL);
156 }
157 else {
158 sb.append(byteBuffer.toString());
159 }
160
161 sb.append(", dataSize=");
162 sb.append(BigEndianCodec.getInt(_headerBufferArray, _INDEX_DATA_SIZE));
163 sb.append(", dataType=");
164 sb.append(_headerBufferArray[_INDEX_DATA_TYPE]);
165 sb.append(", sequenceId=");
166 sb.append(
167 BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID));
168 sb.append(", statusFlag=");
169 sb.append(_headerBufferArray[_INDEX_STATUS_FLAG]);
170 sb.append("}");
171
172 return sb.toString();
173 }
174
175 protected static Datagram createACKResponseDatagram(long sequenceId) {
176 Datagram datagram = new Datagram();
177
178
179
180 datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_ACK_RESPONSE;
181
182
183
184 BigEndianCodec.putLong(
185 datagram._headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
186
187
188
189
190
191 BigEndianCodec.putInt(datagram._headerBufferArray, _INDEX_DATA_SIZE, 0);
192
193
194
195 datagram._dataByteBuffer = _EMPTY_BUFFER;
196
197 return datagram;
198 }
199
200 protected static Datagram createReceiveDatagram() {
201 return new Datagram();
202 }
203
204 protected long getSequenceId() {
205 return BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID);
206 }
207
208 protected boolean isAckRequest() {
209 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
210
211 if ((statusFlag & _FLAG_ACK_REQUEST) != 0) {
212 return true;
213 }
214 else {
215 return false;
216 }
217 }
218
219 protected boolean isAckResponse() {
220 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
221
222 if ((statusFlag & _FLAG_ACK_RESPONSE) != 0) {
223 return true;
224 }
225 else {
226 return false;
227 }
228 }
229
230 protected boolean isRequest() {
231 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
232
233 if ((statusFlag & _FLAG_REQUEST) != 0) {
234 return true;
235 }
236 else {
237 return false;
238 }
239 }
240
241 protected boolean isResponse() {
242 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
243
244 if ((statusFlag & _FLAG_RESPONSE) != 0) {
245 return true;
246 }
247 else {
248 return false;
249 }
250 }
251
252 protected boolean readFrom(ScatteringByteChannel scatteringByteChannel)
253 throws IOException {
254
255 if (_headerByteBuffer.hasRemaining()) {
256 if (scatteringByteChannel.read(_headerByteBuffer) == -1) {
257 throw new EOFException();
258 }
259
260 if (_headerByteBuffer.hasRemaining()) {
261 return false;
262 }
263
264 int dataSize = BigEndianCodec.getInt(
265 _headerBufferArray, _INDEX_DATA_SIZE);
266
267 if (dataSize == 0) {
268 _dataByteBuffer = _EMPTY_BUFFER;
269
270 return true;
271 }
272
273 _dataByteBuffer = ByteBuffer.allocate(dataSize);
274 }
275
276 if (scatteringByteChannel.read(_dataByteBuffer) == -1) {
277 throw new EOFException();
278 }
279
280 if (_dataByteBuffer.hasRemaining()) {
281 return false;
282 }
283
284 _dataByteBuffer.flip();
285
286 return true;
287 }
288
289 protected void setAckRequest(boolean ackRequest) {
290 byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
291
292 if (ackRequest) {
293 statusFlag |= _FLAG_ACK_REQUEST;
294 }
295 else {
296 statusFlag &= ~_FLAG_ACK_REQUEST;
297 }
298
299 _headerBufferArray[_INDEX_STATUS_FLAG] = statusFlag;
300 }
301
302 protected void setSequenceId(long sequenceId) {
303 BigEndianCodec.putLong(
304 _headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
305 }
306
307 protected boolean writeTo(GatheringByteChannel gatheringByteChannel)
308 throws IOException {
309
310 if (_headerByteBuffer.hasRemaining()) {
311 ByteBuffer[] byteBuffers = new ByteBuffer[2];
312
313 byteBuffers[0] = _headerByteBuffer;
314 byteBuffers[1] = _dataByteBuffer;
315
316 gatheringByteChannel.write(byteBuffers);
317 }
318 else {
319 gatheringByteChannel.write(_dataByteBuffer);
320 }
321
322 if (_dataByteBuffer.hasRemaining()) {
323 return false;
324 }
325
326 _dataByteBuffer = null;
327
328 return true;
329 }
330
331 protected Object attachment;
332 protected CompletionHandler<Object> completionHandler;
333 protected EnumSet<CompletionType> completionTypes;
334 protected long expireTime;
335 protected long timeout;
336
337 private Datagram() {
338 _headerByteBuffer = ByteBuffer.allocate(_HEADER_SIZE);
339
340
341
342
343 _headerBufferArray = _headerByteBuffer.array();
344 }
345
346 private static final ByteBuffer _EMPTY_BUFFER = ByteBuffer.allocate(0);
347
348 private static final byte _FLAG_ACK_REQUEST = 1;
349
350 private static final byte _FLAG_ACK_RESPONSE = 2;
351
352 private static final byte _FLAG_REQUEST = 4;
353
354 private static final byte _FLAG_RESPONSE = 8;
355
356 private static final int _HEADER_SIZE = 14;
357
358 private static final int _INDEX_DATA_SIZE = 10;
359
360 private static final int _INDEX_DATA_TYPE = 9;
361
362 private static final int _INDEX_SEQUENCE_ID = 1;
363
364 private static final int _INDEX_STATUS_FLAG = 0;
365
366 private ByteBuffer _dataByteBuffer;
367 private final byte[] _headerBufferArray;
368 private final ByteBuffer _headerByteBuffer;
369
370 }