001
014
015 package com.liferay.portal.kernel.nio.intraband.blocking;
016
017 import com.liferay.portal.kernel.nio.intraband.BaseIntraband;
018 import com.liferay.portal.kernel.nio.intraband.ChannelContext;
019 import com.liferay.portal.kernel.nio.intraband.Datagram;
020 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
021 import com.liferay.portal.kernel.util.NamedThreadFactory;
022
023 import java.io.IOException;
024
025 import java.nio.channels.Channel;
026 import java.nio.channels.GatheringByteChannel;
027 import java.nio.channels.ScatteringByteChannel;
028 import java.nio.channels.SelectableChannel;
029
030 import java.util.Queue;
031 import java.util.concurrent.BlockingQueue;
032 import java.util.concurrent.Callable;
033 import java.util.concurrent.CountDownLatch;
034 import java.util.concurrent.ExecutorService;
035 import java.util.concurrent.Executors;
036 import java.util.concurrent.Future;
037 import java.util.concurrent.LinkedBlockingQueue;
038 import java.util.concurrent.ThreadFactory;
039 import java.util.concurrent.TimeUnit;
040
041
044 public class ExecutorIntraband extends BaseIntraband {
045
046 public ExecutorIntraband(long defaultTimeout) {
047 super(defaultTimeout);
048 }
049
050 @Override
051 public void close() throws InterruptedException, IOException {
052 executorService.shutdownNow();
053
054 executorService.awaitTermination(defaultTimeout, TimeUnit.MILLISECONDS);
055
056 super.close();
057 }
058
059 @Override
060 public RegistrationReference registerChannel(Channel channel) {
061 if (channel == null) {
062 throw new NullPointerException("Channel is null");
063 }
064
065 if (!(channel instanceof GatheringByteChannel)) {
066 throw new IllegalArgumentException(
067 "Channel is not of type GatheringByteChannel");
068 }
069
070 if (!(channel instanceof ScatteringByteChannel)) {
071 throw new IllegalArgumentException(
072 "Channel is not of type ScatteringByteChannel");
073 }
074
075 if (channel instanceof SelectableChannel) {
076 SelectableChannel selectableChannel = (SelectableChannel)channel;
077
078 if (!selectableChannel.isBlocking()) {
079 throw new IllegalArgumentException(
080 "Channel is of type SelectableChannel and " +
081 "configured in nonblocking mode");
082 }
083 }
084
085 ensureOpen();
086
087 return doRegisterChannel(
088 (ScatteringByteChannel)channel, (GatheringByteChannel)channel);
089 }
090
091 @Override
092 public RegistrationReference registerChannel(
093 ScatteringByteChannel scatteringByteChannel,
094 GatheringByteChannel gatheringByteChannel) {
095
096 if (gatheringByteChannel == null) {
097 throw new NullPointerException("Gathering byte channel is null");
098 }
099
100 if (scatteringByteChannel == null) {
101 throw new NullPointerException("Scattering byte channel is null");
102 }
103
104 if (scatteringByteChannel instanceof SelectableChannel) {
105 SelectableChannel selectableChannel =
106 (SelectableChannel)scatteringByteChannel;
107
108 if (!selectableChannel.isBlocking()) {
109 throw new IllegalArgumentException(
110 "Scattering byte channel is of type SelectableChannel " +
111 "and configured in nonblocking mode");
112 }
113 }
114
115 if (gatheringByteChannel instanceof SelectableChannel) {
116 SelectableChannel selectableChannel =
117 (SelectableChannel)gatheringByteChannel;
118
119 if (!selectableChannel.isBlocking()) {
120 throw new IllegalArgumentException(
121 "Gathering byte channel is of type SelectableChannel and " +
122 "configured in nonblocking mode");
123 }
124 }
125
126 ensureOpen();
127
128 return doRegisterChannel(scatteringByteChannel, gatheringByteChannel);
129 }
130
131 protected RegistrationReference doRegisterChannel(
132 ScatteringByteChannel scatteringByteChannel,
133 GatheringByteChannel gatheringByteChannel) {
134
135 BlockingQueue<Datagram> sendingQueue =
136 new LinkedBlockingQueue<Datagram>();
137
138 ChannelContext channelContext = new ChannelContext(sendingQueue);
139
140 ReadingCallable readingCallable = new ReadingCallable(
141 scatteringByteChannel, channelContext);
142 WritingCallable writingCallable = new WritingCallable(
143 gatheringByteChannel, channelContext);
144
145
146
147
148
149 Future<Void> readFuture = executorService.submit(readingCallable);
150 Future<Void> writeFuture = executorService.submit(writingCallable);
151
152 FutureRegistrationReference futureRegistrationReference =
153 new FutureRegistrationReference(
154 this, channelContext, readFuture, writeFuture);
155
156 channelContext.setRegistrationReference(futureRegistrationReference);
157
158 readingCallable.openLatch();
159 writingCallable.openLatch();
160
161 return futureRegistrationReference;
162 }
163
164 @Override
165 protected void doSendDatagram(
166 RegistrationReference registrationReference, Datagram datagram) {
167
168 FutureRegistrationReference futureRegistrationReference =
169 (FutureRegistrationReference)registrationReference;
170
171 ChannelContext channelContext =
172 futureRegistrationReference.channelContext;
173
174 Queue<Datagram> sendingQueue = channelContext.getSendingQueue();
175
176 sendingQueue.offer(datagram);
177 }
178
179 protected static final ThreadFactory THREAD_FACTORY =
180 new NamedThreadFactory(
181 ExecutorIntraband.class + ".threadFactory", Thread.NORM_PRIORITY,
182 ExecutorIntraband.class.getClassLoader());
183
184 protected final ExecutorService executorService =
185 Executors.newCachedThreadPool(THREAD_FACTORY);
186
187 protected class ReadingCallable implements Callable<Void> {
188
189 public ReadingCallable(
190 ScatteringByteChannel scatteringByteChannel,
191 ChannelContext channelContext) {
192
193 _scatteringByteChannel = scatteringByteChannel;
194 _channelContext = channelContext;
195
196 _countDownLatch = new CountDownLatch(1);
197 }
198
199 @Override
200 public Void call() throws Exception {
201 _countDownLatch.await();
202
203 while (_scatteringByteChannel.isOpen()) {
204 handleReading(_scatteringByteChannel, _channelContext);
205 }
206
207 return null;
208 }
209
210 public void openLatch() {
211 _countDownLatch.countDown();
212 }
213
214 private final ChannelContext _channelContext;
215 private final CountDownLatch _countDownLatch;
216 private final ScatteringByteChannel _scatteringByteChannel;
217
218 }
219
220 protected class WritingCallable implements Callable<Void> {
221
222 public WritingCallable(
223 GatheringByteChannel gatheringByteChannel,
224 ChannelContext channelContext) {
225
226 _gatheringByteChannel = gatheringByteChannel;
227 _channelContext = channelContext;
228
229 _countDownLatch = new CountDownLatch(1);
230 }
231
232 @Override
233 public Void call() throws Exception {
234 _countDownLatch.await();
235
236 try {
237 BlockingQueue<Datagram> sendingQueue =
238 (BlockingQueue<Datagram>)_channelContext.getSendingQueue();
239
240 while (true) {
241 Datagram datagram = sendingQueue.take();
242
243 _channelContext.setWritingDatagram(datagram);
244
245 if (!handleWriting(
246 _gatheringByteChannel, _channelContext)) {
247
248 if (_gatheringByteChannel.isOpen()) {
249
250
251
252
253 throw new IllegalStateException(
254 _gatheringByteChannel +
255 " behaved in nonblocking way.");
256 }
257 else {
258 break;
259 }
260 }
261
262 cleanUpTimeoutResponseWaitingDatagrams();
263 }
264 }
265 catch (InterruptedException ie) {
266 }
267
268 return null;
269 }
270
271 public void openLatch() {
272 _countDownLatch.countDown();
273 }
274
275 private final ChannelContext _channelContext;
276 private final CountDownLatch _countDownLatch;
277 private final GatheringByteChannel _gatheringByteChannel;
278
279 }
280
281 }