001
014
015 package com.liferay.portal.kernel.nio.intraband.nonblocking;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.BaseIntraband;
020 import com.liferay.portal.kernel.nio.intraband.ChannelContext;
021 import com.liferay.portal.kernel.nio.intraband.Datagram;
022 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
023 import com.liferay.portal.kernel.util.NamedThreadFactory;
024
025 import java.io.IOException;
026
027 import java.nio.channels.CancelledKeyException;
028 import java.nio.channels.Channel;
029 import java.nio.channels.ClosedSelectorException;
030 import java.nio.channels.GatheringByteChannel;
031 import java.nio.channels.ScatteringByteChannel;
032 import java.nio.channels.SelectableChannel;
033 import java.nio.channels.SelectionKey;
034 import java.nio.channels.Selector;
035
036 import java.util.Iterator;
037 import java.util.Queue;
038 import java.util.Set;
039 import java.util.concurrent.Callable;
040 import java.util.concurrent.ConcurrentLinkedQueue;
041 import java.util.concurrent.FutureTask;
042 import java.util.concurrent.ThreadFactory;
043
044
047 public class SelectorIntraband extends BaseIntraband {
048
049 public SelectorIntraband(long defaultTimeout) throws IOException {
050 super(defaultTimeout);
051
052 pollingThread.start();
053 }
054
055 @Override
056 public void close() throws InterruptedException, IOException {
057 selector.close();
058
059 pollingThread.interrupt();
060
061 pollingThread.join(defaultTimeout);
062
063 super.close();
064 }
065
066 @Override
067 public RegistrationReference registerChannel(Channel channel)
068 throws IOException {
069
070 if (channel == null) {
071 throw new NullPointerException("Channel is null");
072 }
073
074 if (!(channel instanceof GatheringByteChannel)) {
075 throw new IllegalArgumentException(
076 "Channel is not of type GatheringByteChannel");
077 }
078
079 if (!(channel instanceof ScatteringByteChannel)) {
080 throw new IllegalArgumentException(
081 "Channel is not of type ScatteringByteChannel");
082 }
083
084 if (!(channel instanceof SelectableChannel)) {
085 throw new IllegalArgumentException(
086 "Channel is not of type SelectableChannel");
087 }
088
089 SelectableChannel selectableChannel = (SelectableChannel)channel;
090
091 if ((selectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
092 throw new IllegalArgumentException(
093 "Channel is not valid for reading");
094 }
095
096 if ((selectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
097 throw new IllegalArgumentException(
098 "Channel is not valid for writing");
099 }
100
101 ensureOpen();
102
103 selectableChannel.configureBlocking(false);
104
105 FutureTask<RegistrationReference> registerFutureTask =
106 new FutureTask<RegistrationReference>(
107 new RegisterCallable(selectableChannel, selectableChannel));
108
109 registerQueue.offer(registerFutureTask);
110
111 selector.wakeup();
112
113 try {
114 return registerFutureTask.get();
115 }
116 catch (Exception e) {
117 throw new IOException(e);
118 }
119 }
120
121 @Override
122 public RegistrationReference registerChannel(
123 ScatteringByteChannel scatteringByteChannel,
124 GatheringByteChannel gatheringByteChannel)
125 throws IOException {
126
127 if (scatteringByteChannel == null) {
128 throw new NullPointerException("Scattering byte channel is null");
129 }
130
131 if (gatheringByteChannel == null) {
132 throw new NullPointerException("Gathering byte channel is null");
133 }
134
135 if (!(scatteringByteChannel instanceof SelectableChannel)) {
136 throw new IllegalArgumentException(
137 "Scattering byte channel is not of type SelectableChannel");
138 }
139
140 if (!(gatheringByteChannel instanceof SelectableChannel)) {
141 throw new IllegalArgumentException(
142 "Gathering byte channel is not of type SelectableChannel");
143 }
144
145 SelectableChannel readSelectableChannel =
146 (SelectableChannel)scatteringByteChannel;
147 SelectableChannel writeSelectableChannel =
148 (SelectableChannel)gatheringByteChannel;
149
150 if ((readSelectableChannel.validOps() & SelectionKey.OP_READ) == 0) {
151 throw new IllegalArgumentException(
152 "Scattering byte channel is not valid for reading");
153 }
154
155 if ((writeSelectableChannel.validOps() & SelectionKey.OP_WRITE) == 0) {
156 throw new IllegalArgumentException(
157 "Gathering byte channel is not valid for writing");
158 }
159
160 ensureOpen();
161
162 readSelectableChannel.configureBlocking(false);
163 writeSelectableChannel.configureBlocking(false);
164
165 FutureTask<RegistrationReference> registerFutureTask =
166 new FutureTask<RegistrationReference>(
167 new RegisterCallable(
168 readSelectableChannel, writeSelectableChannel));
169
170 registerQueue.offer(registerFutureTask);
171
172 selector.wakeup();
173
174 try {
175 return registerFutureTask.get();
176 }
177 catch (Exception e) {
178 throw new IOException(e);
179 }
180 }
181
182 @Override
183 protected void doSendDatagram(
184 RegistrationReference registrationReference, Datagram datagram) {
185
186 SelectionKeyRegistrationReference selectionKeyRegistrationReference =
187 (SelectionKeyRegistrationReference)registrationReference;
188
189 SelectionKey writeSelectionKey =
190 selectionKeyRegistrationReference.writeSelectionKey;
191
192 ChannelContext channelContext =
193 (ChannelContext)writeSelectionKey.attachment();
194
195 Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
196
197 sendingQueue.offer(datagram);
198
199 synchronized (writeSelectionKey) {
200 int ops = writeSelectionKey.interestOps();
201
202 if ((ops & SelectionKey.OP_WRITE) == 0) {
203 ops |= SelectionKey.OP_WRITE;
204
205 writeSelectionKey.interestOps(ops);
206
207 selector.wakeup();
208 }
209 }
210 }
211
212 protected void registerChannels() {
213 FutureTask<RegistrationReference> registerFuturetask = null;
214
215 while ((registerFuturetask = registerQueue.poll()) != null) {
216 registerFuturetask.run();
217 }
218 }
219
220 protected static final ThreadFactory threadFactory =
221 new NamedThreadFactory(
222 SelectorIntraband.class + ".threadFactory", Thread.NORM_PRIORITY,
223 SelectorIntraband.class.getClassLoader());
224
225 protected final Thread pollingThread = threadFactory.newThread(
226 new PollingJob());
227 protected final Queue<FutureTask<RegistrationReference>> registerQueue =
228 new ConcurrentLinkedQueue<FutureTask<RegistrationReference>>();
229 protected final Selector selector = Selector.open();
230
231 protected class RegisterCallable
232 implements Callable<RegistrationReference> {
233
234 public RegisterCallable(
235 SelectableChannel readSelectableChannel,
236 SelectableChannel writeSelectableChannel) {
237
238 _readSelectableChannel = readSelectableChannel;
239 _writeSelectableChannel = writeSelectableChannel;
240 }
241
242 @Override
243 public RegistrationReference call() throws Exception {
244 if (_readSelectableChannel == _writeSelectableChannel) {
245
246
247
248
249
250 SelectionKey selectionKey = _readSelectableChannel.register(
251 selector, 0);
252
253 SelectionKeyRegistrationReference
254 selectionKeyRegistrationReference =
255 new SelectionKeyRegistrationReference(
256 SelectorIntraband.this, selectionKey, selectionKey);
257
258 ChannelContext channelContext = new ChannelContext(
259 new ConcurrentLinkedQueue<Datagram>());
260
261 channelContext.setRegistrationReference(
262 selectionKeyRegistrationReference);
263
264 selectionKey.attach(channelContext);
265
266
267
268 selectionKey.interestOps(SelectionKey.OP_READ);
269
270 return selectionKeyRegistrationReference;
271 }
272 else {
273
274
275
276
277
278 SelectionKey readSelectionKey = _readSelectableChannel.register(
279 selector, 0);
280
281 SelectionKey writeSelectionKey =
282 _writeSelectableChannel.register(selector, 0);
283
284 SelectionKeyRegistrationReference
285 selectionKeyRegistrationReference =
286 new SelectionKeyRegistrationReference(
287 SelectorIntraband.this, readSelectionKey,
288 writeSelectionKey);
289
290 ChannelContext channelContext = new ChannelContext(
291 new ConcurrentLinkedQueue<Datagram>());
292
293 channelContext.setRegistrationReference(
294 selectionKeyRegistrationReference);
295
296 readSelectionKey.attach(channelContext);
297 writeSelectionKey.attach(channelContext);
298
299
300
301 readSelectionKey.interestOps(SelectionKey.OP_READ);
302
303 return selectionKeyRegistrationReference;
304 }
305 }
306
307 private final SelectableChannel _readSelectableChannel;
308 private final SelectableChannel _writeSelectableChannel;
309
310 }
311
312 private void _processReading(SelectionKey selectionKey) {
313 ScatteringByteChannel scatteringByteChannel =
314 (ScatteringByteChannel)selectionKey.channel();
315
316 ChannelContext channelContext =
317 (ChannelContext)selectionKey.attachment();
318
319 handleReading(scatteringByteChannel, channelContext);
320 }
321
322 private void _processWriting(SelectionKey selectionKey) {
323 GatheringByteChannel gatheringByteChannel =
324 (GatheringByteChannel)selectionKey.channel();
325
326 ChannelContext channelContext =
327 (ChannelContext)selectionKey.attachment();
328
329 Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
330
331 if (channelContext.getWritingDatagram() == null) {
332 channelContext.setWritingDatagram(sendingQueue.poll());
333 }
334
335 boolean backOff = false;
336
337 if (channelContext.getWritingDatagram() != null) {
338 if (handleWriting(gatheringByteChannel, channelContext)) {
339 if (sendingQueue.isEmpty()) {
340 backOff = true;
341 }
342 }
343 }
344 else {
345 backOff = true;
346 }
347
348 if (backOff) {
349
350
351
352
353 int ops = selectionKey.interestOps();
354
355 ops &= ~SelectionKey.OP_WRITE;
356
357 synchronized (selectionKey) {
358 if (sendingQueue.isEmpty()) {
359 selectionKey.interestOps(ops);
360 }
361 }
362 }
363 }
364
365 private static Log _log = LogFactoryUtil.getLog(SelectorIntraband.class);
366
367 private class PollingJob implements Runnable {
368
369 @Override
370 public void run() {
371 try {
372 try {
373 while (true) {
374 int readyCount = selector.select();
375
376 if (readyCount > 0) {
377 Set<SelectionKey> selectionKeys =
378 selector.selectedKeys();
379
380 Iterator<SelectionKey> iterator =
381 selectionKeys.iterator();
382
383 while (iterator.hasNext()) {
384 SelectionKey selectionKey = iterator.next();
385
386 iterator.remove();
387
388 try {
389 if (selectionKey.isReadable()) {
390 _processReading(selectionKey);
391 }
392
393 if (selectionKey.isWritable()) {
394 _processWriting(selectionKey);
395 }
396 }
397 catch (CancelledKeyException cke) {
398
399
400
401 }
402 }
403 }
404 else if (!selector.isOpen()) {
405 break;
406 }
407
408 registerChannels();
409 cleanUpTimeoutResponseWaitingDatagrams();
410 }
411 }
412 finally {
413 selector.close();
414 }
415 }
416 catch (ClosedSelectorException cse) {
417 if (_log.isInfoEnabled()) {
418 Thread currentThread = Thread.currentThread();
419
420 _log.info(
421 currentThread.getName() +
422 " exiting gracefully on selector closure");
423 }
424 }
425 catch (Throwable t) {
426 Thread currentThread = Thread.currentThread();
427
428 _log.error(
429 currentThread.getName() + " exiting exceptionally", t);
430 }
431
432
433
434
435 registerChannels();
436
437 responseWaitingMap.clear();
438 timeoutMap.clear();
439 }
440
441 }
442
443 }