001
014
015 package com.liferay.portal.kernel.nio.intraband;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
020
021 import java.io.IOException;
022
023 import java.nio.channels.GatheringByteChannel;
024 import java.nio.channels.ScatteringByteChannel;
025
026 import java.util.EnumSet;
027 import java.util.Iterator;
028 import java.util.Map;
029 import java.util.NavigableMap;
030 import java.util.Set;
031 import java.util.concurrent.ConcurrentHashMap;
032 import java.util.concurrent.ConcurrentSkipListMap;
033 import java.util.concurrent.CountDownLatch;
034 import java.util.concurrent.TimeUnit;
035 import java.util.concurrent.TimeoutException;
036 import java.util.concurrent.atomic.AtomicLong;
037 import java.util.concurrent.atomic.AtomicReference;
038
039
042 public abstract class BaseIntraband implements Intraband {
043
044 public BaseIntraband(long defaultTimeout) {
045 this.defaultTimeout = defaultTimeout;
046 }
047
048 @Override
049 @SuppressWarnings("unused")
050 public void close() throws InterruptedException, IOException {
051 datagramReceiveHandlersReference.set(null);
052
053 open = false;
054 }
055
056 @Override
057 public DatagramReceiveHandler[] getDatagramReceiveHandlers() {
058 ensureOpen();
059
060 DatagramReceiveHandler[] datagramReceiveHandlers =
061 datagramReceiveHandlersReference.get();
062
063 return datagramReceiveHandlers.clone();
064 }
065
066 @Override
067 public boolean isOpen() {
068 return open;
069 }
070
071 @Override
072 public DatagramReceiveHandler registerDatagramReceiveHandler(
073 byte type, DatagramReceiveHandler datagramReceiveHandler) {
074
075 ensureOpen();
076
077 int index = type & 0xFF;
078
079 DatagramReceiveHandler oldDatagramReceiveHandler = null;
080 DatagramReceiveHandler[] datagramReceiveHandlers = null;
081 DatagramReceiveHandler[] copyDatagramReceiveHandlers = null;
082
083 do {
084 datagramReceiveHandlers = datagramReceiveHandlersReference.get();
085
086 copyDatagramReceiveHandlers = datagramReceiveHandlers.clone();
087
088 oldDatagramReceiveHandler = copyDatagramReceiveHandlers[index];
089
090 copyDatagramReceiveHandlers[index] = datagramReceiveHandler;
091 }
092 while (
093 !datagramReceiveHandlersReference.compareAndSet(
094 datagramReceiveHandlers, copyDatagramReceiveHandlers));
095
096 return oldDatagramReceiveHandler;
097 }
098
099 @Override
100 public void sendDatagram(
101 RegistrationReference registrationReference, Datagram datagram) {
102
103 if (registrationReference == null) {
104 throw new NullPointerException("Registration reference is null");
105 }
106
107 if (!registrationReference.isValid()) {
108 throw new IllegalArgumentException(
109 "Registration reference is invalid");
110 }
111
112 if (datagram == null) {
113 throw new NullPointerException("Datagram is null");
114 }
115
116 ensureOpen();
117
118 doSendDatagram(registrationReference, datagram);
119 }
120
121 @Override
122 public <A> void sendDatagram(
123 RegistrationReference registrationReference, Datagram datagram,
124 A attachment, EnumSet<CompletionHandler.CompletionType> completionTypes,
125 CompletionHandler<A> completionHandler) {
126
127 sendDatagram(
128 registrationReference, datagram, attachment, completionTypes,
129 completionHandler, defaultTimeout, TimeUnit.MILLISECONDS);
130 }
131
132 @Override
133 public <A> void sendDatagram(
134 RegistrationReference registrationReference, Datagram datagram,
135 A attachment, EnumSet<CompletionType> completionTypes,
136 CompletionHandler<A> completionHandler, long timeout,
137 TimeUnit timeUnit) {
138
139 if (registrationReference == null) {
140 throw new NullPointerException("Registration reference is null");
141 }
142
143 if (!registrationReference.isValid()) {
144 throw new IllegalArgumentException(
145 "Registration reference is invalid");
146 }
147
148 if (datagram == null) {
149 throw new NullPointerException("Datagram is null");
150 }
151
152 if (completionTypes == null) {
153 throw new NullPointerException("Completion type set is null");
154 }
155
156 if (completionTypes.isEmpty()) {
157 throw new IllegalArgumentException("Completion type set is empty");
158 }
159
160 if (completionHandler == null) {
161 throw new NullPointerException("Complete handler is null");
162 }
163
164 if (timeUnit == null) {
165 throw new NullPointerException("Time unit is null");
166 }
167
168 if (timeout <= 0) {
169 timeout = defaultTimeout;
170 }
171 else {
172 timeout = timeUnit.toMillis(timeout);
173 }
174
175 ensureOpen();
176
177 datagram.attachment = attachment;
178 datagram.completionHandler =
179 (CompletionHandler<Object>)completionHandler;
180 datagram.completionTypes = completionTypes;
181 datagram.timeout = timeout;
182
183 datagram.setAckRequest(
184 completionTypes.contains(CompletionType.DELIVERED));
185
186 if (datagram.getSequenceId() == 0) {
187 datagram.setSequenceId(generateSequenceId());
188 }
189
190 if (completionTypes.contains(CompletionType.DELIVERED) ||
191 completionTypes.contains(CompletionType.REPLIED)) {
192
193 addResponseWaitingDatagram(datagram);
194 }
195
196 doSendDatagram(registrationReference, datagram);
197 }
198
199 @Override
200 public Datagram sendSyncDatagram(
201 RegistrationReference registrationReference, Datagram datagram)
202 throws InterruptedException, IOException, TimeoutException {
203
204 return sendSyncDatagram(
205 registrationReference, datagram, defaultTimeout,
206 TimeUnit.MILLISECONDS);
207 }
208
209 @Override
210 public Datagram sendSyncDatagram(
211 RegistrationReference registrationReference, Datagram datagram,
212 long timeout, TimeUnit timeUnit)
213 throws InterruptedException, IOException, TimeoutException {
214
215 if (registrationReference == null) {
216 throw new NullPointerException("Registration reference is null");
217 }
218
219 if (!registrationReference.isValid()) {
220 throw new IllegalArgumentException(
221 "Registration reference is invalid");
222 }
223
224 if (datagram == null) {
225 throw new NullPointerException("Datagram is null");
226 }
227
228 if (timeUnit == null) {
229 throw new NullPointerException("Time unit is null");
230 }
231
232 if (timeout <= 0) {
233 timeout = defaultTimeout;
234 }
235 else {
236 timeout = timeUnit.toMillis(timeout);
237 }
238
239 ensureOpen();
240
241 return doSendSyncDatagram(registrationReference, datagram, timeout);
242 }
243
244 @Override
245 public DatagramReceiveHandler unregisterDatagramReceiveHandler(byte type) {
246 return registerDatagramReceiveHandler(type, null);
247 }
248
249 protected void addResponseWaitingDatagram(Datagram requestDatagram) {
250 long sequenceId = requestDatagram.getSequenceId();
251
252 long expireTime = System.currentTimeMillis() + requestDatagram.timeout;
253
254 requestDatagram.expireTime = expireTime;
255
256 responseWaitingMap.put(sequenceId, requestDatagram);
257
258 timeoutMap.put(expireTime, sequenceId);
259 }
260
261 protected void cleanUpTimeoutResponseWaitingDatagrams() {
262 Map<Long, Long> map = timeoutMap.headMap(
263 System.currentTimeMillis(), true);
264
265 if (map.isEmpty()) {
266 return;
267 }
268
269 Set<Map.Entry<Long, Long>> set = map.entrySet();
270
271 Iterator<Map.Entry<Long, Long>> iterator = set.iterator();
272
273 while (iterator.hasNext()) {
274 Map.Entry<Long, Long> entry = iterator.next();
275
276 iterator.remove();
277
278 Long sequenceId = entry.getValue();
279
280 Datagram datagram = responseWaitingMap.remove(sequenceId);
281
282 if (_log.isWarnEnabled()) {
283 _log.warn(
284 "Removed timeout response waiting datagram " + datagram);
285 }
286
287 datagram.completionHandler.timedOut(datagram.attachment);
288 }
289 }
290
291 protected abstract void doSendDatagram(
292 RegistrationReference registrationReference, Datagram datagram);
293
294 protected Datagram doSendSyncDatagram(
295 RegistrationReference registrationReference, Datagram datagram,
296 long timeout)
297 throws InterruptedException, IOException, TimeoutException {
298
299 SendSyncDatagramCompletionHandler sendSyncDatagramCompletionHandler =
300 new SendSyncDatagramCompletionHandler();
301
302 datagram.completionHandler = sendSyncDatagramCompletionHandler;
303 datagram.completionTypes = REPLIED_ENUM_SET;
304 datagram.timeout = timeout;
305
306 if (datagram.getSequenceId() == 0) {
307 datagram.setSequenceId(generateSequenceId());
308 }
309
310 addResponseWaitingDatagram(datagram);
311
312 doSendDatagram(registrationReference, datagram);
313
314 return sendSyncDatagramCompletionHandler.waitResult(timeout);
315 }
316
317 protected void ensureOpen() {
318 if (!isOpen()) {
319 throw new ClosedIntrabandException();
320 }
321 }
322
323 protected long generateSequenceId() {
324 long sequenceId = sequenceIdGenerator.getAndIncrement();
325
326 if (sequenceId < 0) {
327
328
329
330
331
332
333
334
335
336
337
338
339
340 sequenceId += Long.MIN_VALUE;
341 }
342
343 return sequenceId;
344 }
345
346 protected void handleReading(
347 ScatteringByteChannel scatteringByteChannel,
348 ChannelContext channelContext) {
349
350 Datagram datagram = channelContext.getReadingDatagram();
351
352 if (datagram == null) {
353 datagram = Datagram.createReceiveDatagram();
354
355 channelContext.setReadingDatagram(datagram);
356 }
357
358 try {
359 if (datagram.readFrom(scatteringByteChannel)) {
360 channelContext.setReadingDatagram(
361 Datagram.createReceiveDatagram());
362
363 if (datagram.isAckResponse()) {
364 Datagram requestDatagram = removeResponseWaitingDatagram(
365 datagram);
366
367 if (requestDatagram == null) {
368 if (_log.isWarnEnabled()) {
369 _log.warn(
370 "Dropped ownerless ACK response " + datagram);
371 }
372 }
373 else {
374 CompletionHandler<Object> completionHandler =
375 requestDatagram.completionHandler;
376
377 completionHandler.delivered(requestDatagram.attachment);
378 }
379 }
380 else if (datagram.isResponse()) {
381 Datagram requestDatagram = removeResponseWaitingDatagram(
382 datagram);
383
384 if (requestDatagram == null) {
385 if (_log.isWarnEnabled()) {
386 _log.warn("Dropped ownerless response " + datagram);
387 }
388 }
389 else {
390 EnumSet<CompletionType> completionTypes =
391 requestDatagram.completionTypes;
392
393 if (completionTypes.contains(CompletionType.REPLIED)) {
394 CompletionHandler<Object> completionHandler =
395 requestDatagram.completionHandler;
396
397 completionHandler.replied(
398 requestDatagram.attachment, datagram);
399 }
400 else if (_log.isWarnEnabled()) {
401 _log.warn(
402 "Dropped unconcerned response " + datagram);
403 }
404 }
405 }
406 else {
407 if (datagram.isAckRequest()) {
408 Datagram ackResponseDatagram =
409 Datagram.createACKResponseDatagram(
410 datagram.getSequenceId());
411
412 doSendDatagram(
413 channelContext.getRegistrationReference(),
414 ackResponseDatagram);
415 }
416
417 int index = datagram.getType() & 0xFF;
418
419 DatagramReceiveHandler datagramReceiveHandler =
420 datagramReceiveHandlersReference.get()[index];
421
422 if (datagramReceiveHandler == null) {
423 if (_log.isWarnEnabled()) {
424 _log.warn("Dropped ownerless request " + datagram);
425 }
426 }
427 else {
428 try {
429 datagramReceiveHandler.receive(
430 channelContext.getRegistrationReference(),
431 datagram);
432 }
433 catch (Throwable t) {
434 _log.error("Unable to dispatch", t);
435 }
436 }
437 }
438 }
439 }
440 catch (IOException ioe) {
441 RegistrationReference registrationReference =
442 channelContext.getRegistrationReference();
443
444 registrationReference.cancelRegistration();
445
446 if (_log.isDebugEnabled()) {
447 _log.debug(
448 "Broken read channel, unregister " + registrationReference,
449 ioe);
450 }
451 else if (_log.isInfoEnabled()) {
452 _log.info(
453 "Broken read channel, unregister " + registrationReference);
454 }
455 }
456 }
457
458 protected boolean handleWriting(
459 GatheringByteChannel gatheringByteChannel,
460 ChannelContext channelContext) {
461
462 Datagram datagram = channelContext.getWritingDatagram();
463
464 try {
465 if (datagram.writeTo(gatheringByteChannel)) {
466 channelContext.setWritingDatagram(null);
467
468 EnumSet<CompletionType> completionTypes =
469 datagram.completionTypes;
470
471 if (completionTypes != null) {
472 if (completionTypes.contains(CompletionType.SUBMITTED)) {
473 CompletionHandler<Object> completeHandler =
474 datagram.completionHandler;
475
476 completeHandler.submitted(datagram.attachment);
477 }
478 }
479
480 return true;
481 }
482 else {
483 return false;
484 }
485 }
486 catch (IOException ioe) {
487 RegistrationReference registrationReference =
488 channelContext.getRegistrationReference();
489
490 registrationReference.cancelRegistration();
491
492 CompletionHandler<Object> completionHandler =
493 datagram.completionHandler;
494
495 if (completionHandler != null) {
496 completionHandler.failed(datagram.attachment, ioe);
497 }
498
499 if (_log.isDebugEnabled()) {
500 _log.debug(
501 "Broken write channel, unregister " + registrationReference,
502 ioe);
503 }
504 else if (_log.isInfoEnabled()) {
505 _log.info(
506 "Broken write channel, unregister " +
507 registrationReference);
508 }
509
510 return false;
511 }
512 }
513
514 protected Datagram removeResponseWaitingDatagram(
515 Datagram responseDatagram) {
516
517 long sequenceId = responseDatagram.getSequenceId();
518
519 Datagram requestDatagram = responseWaitingMap.remove(sequenceId);
520
521 if (requestDatagram != null) {
522 timeoutMap.remove(requestDatagram.expireTime);
523 }
524
525 return requestDatagram;
526 }
527
528 protected static final EnumSet<CompletionType> REPLIED_ENUM_SET =
529 EnumSet.of(CompletionType.REPLIED);
530
531 protected final long defaultTimeout;
532 protected final AtomicReference<DatagramReceiveHandler[]>
533 datagramReceiveHandlersReference =
534 new AtomicReference<DatagramReceiveHandler[]>(
535 new DatagramReceiveHandler[256]);
536 protected volatile boolean open = true;
537 protected final Map<Long, Datagram> responseWaitingMap =
538 new ConcurrentHashMap<Long, Datagram>();
539 protected final AtomicLong sequenceIdGenerator = new AtomicLong();
540 protected final NavigableMap<Long, Long> timeoutMap =
541 new ConcurrentSkipListMap<Long, Long>();
542
543 protected static class SendSyncDatagramCompletionHandler
544 implements CompletionHandler<Object> {
545
546 @Override
547 public void delivered(Object attachment) {
548 }
549
550 @Override
551 public void failed(Object attachment, IOException ioe) {
552
553
554
555 _ioe = ioe;
556
557 _countDownLatch.countDown();
558 }
559
560 @Override
561 public void replied(Object attachment, Datagram datagram) {
562
563
564
565 _datagram = datagram;
566
567 _countDownLatch.countDown();
568 }
569
570 @Override
571 public void submitted(Object attachment) {
572 }
573
574 @Override
575 public void timedOut(Object attachment) {
576 }
577
578 public Datagram waitResult(long timeout)
579 throws InterruptedException, IOException, TimeoutException {
580
581 boolean result = _countDownLatch.await(
582 timeout, TimeUnit.MILLISECONDS);
583
584 if (!result) {
585 throw new TimeoutException("Result waiting timeout");
586 }
587
588 if (_ioe != null) {
589 throw _ioe;
590 }
591
592 return _datagram;
593 }
594
595 private final CountDownLatch _countDownLatch = new CountDownLatch(1);
596 private Datagram _datagram;
597 private IOException _ioe;
598
599 }
600
601 private static Log _log = LogFactoryUtil.getLog(BaseIntraband.class);
602
603 }