001
014
015 package com.liferay.portal.kernel.nio.intraband.rpc;
016
017 import com.liferay.portal.kernel.io.Deserializer;
018 import com.liferay.portal.kernel.io.Serializer;
019 import com.liferay.portal.kernel.nio.intraband.CompletionHandler;
020 import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
021 import com.liferay.portal.kernel.nio.intraband.Datagram;
022 import com.liferay.portal.kernel.nio.intraband.Intraband;
023 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
024 import com.liferay.portal.kernel.nio.intraband.SystemDataType;
025 import com.liferay.portal.kernel.process.ProcessCallable;
026
027 import java.io.IOException;
028 import java.io.Serializable;
029
030 import java.util.EnumSet;
031 import java.util.concurrent.Callable;
032 import java.util.concurrent.Future;
033 import java.util.concurrent.FutureTask;
034
035
038 public class IntrabandRPCUtil {
039
040 public static <V extends Serializable> Future<V> execute(
041 RegistrationReference registrationReference,
042 ProcessCallable<V> processCallable)
043 throws IntrabandRPCException {
044
045 Intraband intraband = registrationReference.getIntraband();
046
047 SystemDataType systemDataType = SystemDataType.RPC;
048
049 Serializer serializer = new Serializer();
050
051 serializer.writeObject(processCallable);
052
053 try {
054 Datagram datagram = Datagram.createRequestDatagram(
055 systemDataType.getValue(), serializer.toByteBuffer());
056
057 FutureResult<V> futureResult = new FutureResult<V>();
058
059 intraband.sendDatagram(
060 registrationReference, datagram, null, repliedEnumSet,
061 new FutureCompletionHandler<V>(futureResult));
062
063 return futureResult;
064 }
065 catch (Exception e) {
066 throw new IntrabandRPCException(e);
067 }
068 }
069
070 protected static Callable<Serializable> emptyCallable =
071 new Callable<Serializable>() {
072
073 @Override
074 public Serializable call() throws Exception {
075 return null;
076 }
077
078 };
079
080 protected static EnumSet<CompletionType> repliedEnumSet = EnumSet.of(
081 CompletionType.REPLIED);
082
083 protected static class FutureCompletionHandler<V extends Serializable>
084 implements CompletionHandler<Object> {
085
086 protected FutureCompletionHandler(FutureResult<V> futureResult) {
087 _futureResult = futureResult;
088 }
089
090 @Override
091 public void delivered(Object attachment) {
092 }
093
094 @Override
095 public void failed(Object attachment, IOException ioe) {
096 _futureResult.setException(ioe);
097 }
098
099 @Override
100 public void replied(Object attachment, Datagram datagram) {
101 Deserializer deserializer = new Deserializer(
102 datagram.getDataByteBuffer());
103
104 try {
105 V v = deserializer.readObject();
106
107 _futureResult.set(v);
108 }
109 catch (ClassNotFoundException cnfe) {
110 _futureResult.setException(cnfe);
111 }
112 }
113
114 @Override
115 public void submitted(Object attachment) {
116 }
117
118 @Override
119 public void timedOut(Object attachment) {
120 _futureResult.cancel(true);
121 }
122
123 private final FutureResult<V> _futureResult;
124
125 }
126
127 protected static class FutureResult<V extends Serializable>
128 extends FutureTask<V> {
129
130 protected FutureResult() {
131 super((Callable<V>)emptyCallable);
132 }
133
134 @Override
135 protected void set(V v) {
136 super.set(v);
137 }
138
139 @Override
140 protected void setException(Throwable t) {
141 super.setException(t);
142 }
143
144 }
145
146 }