001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband;
016    
017    import com.liferay.portal.kernel.io.BigEndianCodec;
018    import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
019    import com.liferay.portal.kernel.util.StringBundler;
020    import com.liferay.portal.kernel.util.StringPool;
021    
022    import java.io.EOFException;
023    import java.io.IOException;
024    
025    import java.nio.ByteBuffer;
026    import java.nio.channels.GatheringByteChannel;
027    import java.nio.channels.ScatteringByteChannel;
028    
029    import java.util.EnumSet;
030    
031    /**
032     * Represents the communication unit of Intraband.
033     *
034     * <p>
035     * Encodes/decodes data to/from big-endian byte order data format:
036     * </p>
037     *
038     * <p>
039     * <table border="1">
040     *
041     * <tr>
042     * <td>Name</td><td>Type</td><td>Size(byte)</td><td>Offset</td>
043     * </tr>
044     * <tr>
045     * <td>Status Flag</td><td>byte</td><td>1</td><td>0</td>
046     * </tr>
047     * <tr>
048     * <td>Sequence ID</td><td>long</td><td>8</td><td>1</td>
049     * </tr>
050     * <tr>
051     * <td>Data Type</td><td>byte</td><td>1</td><td>9</td>
052     * </tr>
053     * <tr>
054     * <td>Data Size</td><td>int</td><td>4</td><td>10</td>
055     * </tr>
056     * <tr>
057     * <td>Data Chunk</td>
058     * <td>byte[]</td>
059     * <td>
060     * <pre>${Data Size}</pre>
061     * </td> <td>14</td> </tr>
062     *
063     * </table>
064     * </p>
065     *
066     * @author Shuyang Zhou
067     */
068    public class Datagram {
069    
070            public static Datagram createRequestDatagram(byte type, byte[] data) {
071                    return createRequestDatagram(type, ByteBuffer.wrap(data));
072            }
073    
074            public static Datagram createRequestDatagram(
075                    byte type, ByteBuffer dataByteBuffer) {
076    
077                    Datagram datagram = new Datagram();
078    
079                    // Status flag
080    
081                    datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_REQUEST;
082    
083                    // Request datagram does not set the sequence ID
084    
085                    // Data type
086    
087                    datagram._headerBufferArray[_INDEX_DATA_TYPE] = type;
088    
089                    // Data size
090    
091                    BigEndianCodec.putInt(
092                            datagram._headerBufferArray, _INDEX_DATA_SIZE,
093                            dataByteBuffer.remaining());
094    
095                    // Data chunk
096    
097                    datagram._dataByteBuffer = dataByteBuffer;
098    
099                    return datagram;
100            }
101    
102            public static Datagram createResponseDatagram(
103                    Datagram requestDatagram, byte[] data) {
104    
105                    return createResponseDatagram(requestDatagram, ByteBuffer.wrap(data));
106            }
107    
108            public static Datagram createResponseDatagram(
109                    Datagram requestDatagram, ByteBuffer byteBuffer) {
110    
111                    Datagram datagram = new Datagram();
112    
113                    // Status flag
114    
115                    datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_RESPONSE;
116    
117                    // Sequence ID
118    
119                    BigEndianCodec.putLong(
120                            datagram._headerBufferArray, _INDEX_SEQUENCE_ID,
121                            requestDatagram.getSequenceId());
122    
123                    // Response datagram does not set the data type
124    
125                    // Data size
126    
127                    BigEndianCodec.putInt(
128                            datagram._headerBufferArray, _INDEX_DATA_SIZE,
129                            byteBuffer.remaining());
130    
131                    // Data chunk
132    
133                    datagram._dataByteBuffer = byteBuffer;
134    
135                    return datagram;
136            }
137    
138            public ByteBuffer getDataByteBuffer() {
139                    return _dataByteBuffer;
140            }
141    
142            public byte getType() {
143                    return _headerBufferArray[_INDEX_DATA_TYPE];
144            }
145    
146            @Override
147            public String toString() {
148                    StringBundler sb = new StringBundler(11);
149    
150                    sb.append("{dataChunk=");
151    
152                    ByteBuffer byteBuffer = _dataByteBuffer;
153    
154                    if (byteBuffer == null) {
155                            sb.append(StringPool.NULL);
156                    }
157                    else {
158                            sb.append(byteBuffer.toString());
159                    }
160    
161                    sb.append(", dataSize=");
162                    sb.append(BigEndianCodec.getInt(_headerBufferArray, _INDEX_DATA_SIZE));
163                    sb.append(", dataType=");
164                    sb.append(_headerBufferArray[_INDEX_DATA_TYPE]);
165                    sb.append(", sequenceId=");
166                    sb.append(
167                            BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID));
168                    sb.append(", statusFlag=");
169                    sb.append(_headerBufferArray[_INDEX_STATUS_FLAG]);
170                    sb.append("}");
171    
172                    return sb.toString();
173            }
174    
175            protected static Datagram createACKResponseDatagram(long sequenceId) {
176                    Datagram datagram = new Datagram();
177    
178                    // Status flag
179    
180                    datagram._headerBufferArray[_INDEX_STATUS_FLAG] = _FLAG_ACK_RESPONSE;
181    
182                    // Sequence ID
183    
184                    BigEndianCodec.putLong(
185                            datagram._headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
186    
187                    // ACK response datagram does not set the data type
188    
189                    // Data size
190    
191                    BigEndianCodec.putInt(datagram._headerBufferArray, _INDEX_DATA_SIZE, 0);
192    
193                    // Data chunk
194    
195                    datagram._dataByteBuffer = _EMPTY_BUFFER;
196    
197                    return datagram;
198            }
199    
200            protected static Datagram createReceiveDatagram() {
201                    return new Datagram();
202            }
203    
204            protected long getSequenceId() {
205                    return BigEndianCodec.getLong(_headerBufferArray, _INDEX_SEQUENCE_ID);
206            }
207    
208            protected boolean isAckRequest() {
209                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
210    
211                    if ((statusFlag & _FLAG_ACK_REQUEST) != 0) {
212                            return true;
213                    }
214                    else {
215                            return false;
216                    }
217            }
218    
219            protected boolean isAckResponse() {
220                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
221    
222                    if ((statusFlag & _FLAG_ACK_RESPONSE) != 0) {
223                            return true;
224                    }
225                    else {
226                            return false;
227                    }
228            }
229    
230            protected boolean isRequest() {
231                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
232    
233                    if ((statusFlag & _FLAG_REQUEST) != 0) {
234                            return true;
235                    }
236                    else {
237                            return false;
238                    }
239            }
240    
241            protected boolean isResponse() {
242                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
243    
244                    if ((statusFlag & _FLAG_RESPONSE) != 0) {
245                            return true;
246                    }
247                    else {
248                            return false;
249                    }
250            }
251    
252            protected boolean readFrom(ScatteringByteChannel scatteringByteChannel)
253                    throws IOException {
254    
255                    if (_headerByteBuffer.hasRemaining()) {
256                            if (scatteringByteChannel.read(_headerByteBuffer) == -1) {
257                                    throw new EOFException();
258                            }
259    
260                            if (_headerByteBuffer.hasRemaining()) {
261                                    return false;
262                            }
263    
264                            int dataSize = BigEndianCodec.getInt(
265                                    _headerBufferArray, _INDEX_DATA_SIZE);
266    
267                            if (dataSize == 0) {
268                                    _dataByteBuffer = _EMPTY_BUFFER;
269    
270                                    return true;
271                            }
272    
273                            _dataByteBuffer = ByteBuffer.allocate(dataSize);
274                    }
275    
276                    if (scatteringByteChannel.read(_dataByteBuffer) == -1) {
277                            throw new EOFException();
278                    }
279    
280                    if (_dataByteBuffer.hasRemaining()) {
281                            return false;
282                    }
283    
284                    _dataByteBuffer.flip();
285    
286                    return true;
287            }
288    
289            protected void setAckRequest(boolean ackRequest) {
290                    byte statusFlag = _headerBufferArray[_INDEX_STATUS_FLAG];
291    
292                    if (ackRequest) {
293                            statusFlag |= _FLAG_ACK_REQUEST;
294                    }
295                    else {
296                            statusFlag &= ~_FLAG_ACK_REQUEST;
297                    }
298    
299                    _headerBufferArray[_INDEX_STATUS_FLAG] = statusFlag;
300            }
301    
302            protected void setSequenceId(long sequenceId) {
303                    BigEndianCodec.putLong(
304                            _headerBufferArray, _INDEX_SEQUENCE_ID, sequenceId);
305            }
306    
307            protected boolean writeTo(GatheringByteChannel gatheringByteChannel)
308                    throws IOException {
309    
310                    if (_headerByteBuffer.hasRemaining()) {
311                            ByteBuffer[] byteBuffers = new ByteBuffer[2];
312    
313                            byteBuffers[0] = _headerByteBuffer;
314                            byteBuffers[1] = _dataByteBuffer;
315    
316                            gatheringByteChannel.write(byteBuffers);
317                    }
318                    else {
319                            gatheringByteChannel.write(_dataByteBuffer);
320                    }
321    
322                    if (_dataByteBuffer.hasRemaining()) {
323                            return false;
324                    }
325    
326                    _dataByteBuffer = null;
327    
328                    return true;
329            }
330    
331            protected Object attachment;
332            protected CompletionHandler<Object> completionHandler;
333            protected EnumSet<CompletionType> completionTypes;
334            protected long expireTime;
335            protected long timeout;
336    
337            private Datagram() {
338                    _headerByteBuffer = ByteBuffer.allocate(_HEADER_SIZE);
339    
340                    // Directly reference the interanl byte array for faster encoding and
341                    // decoding
342    
343                    _headerBufferArray = _headerByteBuffer.array();
344            }
345    
346            private static final ByteBuffer _EMPTY_BUFFER = ByteBuffer.allocate(0);
347    
348            private static final byte _FLAG_ACK_REQUEST = 1;
349    
350            private static final byte _FLAG_ACK_RESPONSE = 2;
351    
352            private static final byte _FLAG_REQUEST = 4;
353    
354            private static final byte _FLAG_RESPONSE = 8;
355    
356            private static final int _HEADER_SIZE = 14;
357    
358            private static final int _INDEX_DATA_SIZE = 10;
359    
360            private static final int _INDEX_DATA_TYPE = 9;
361    
362            private static final int _INDEX_SEQUENCE_ID = 1;
363    
364            private static final int _INDEX_STATUS_FLAG = 0;
365    
366            private ByteBuffer _dataByteBuffer;
367            private final byte[] _headerBufferArray;
368            private final ByteBuffer _headerByteBuffer;
369    
370    }