001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.rpc;
016    
017    import com.liferay.portal.kernel.io.Deserializer;
018    import com.liferay.portal.kernel.io.Serializer;
019    import com.liferay.portal.kernel.nio.intraband.CompletionHandler;
020    import com.liferay.portal.kernel.nio.intraband.CompletionHandler.CompletionType;
021    import com.liferay.portal.kernel.nio.intraband.Datagram;
022    import com.liferay.portal.kernel.nio.intraband.Intraband;
023    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
024    import com.liferay.portal.kernel.nio.intraband.SystemDataType;
025    import com.liferay.portal.kernel.process.ProcessCallable;
026    
027    import java.io.IOException;
028    import java.io.Serializable;
029    
030    import java.util.EnumSet;
031    import java.util.concurrent.Callable;
032    import java.util.concurrent.Future;
033    import java.util.concurrent.FutureTask;
034    
035    /**
036     * @author Shuyang Zhou
037     */
038    public class IntrabandRPCUtil {
039    
040            public static <V extends Serializable> Future<V> execute(
041                            RegistrationReference registrationReference,
042                            ProcessCallable<V> processCallable)
043                    throws IntrabandRPCException {
044    
045                    Intraband intraband = registrationReference.getIntraband();
046    
047                    SystemDataType systemDataType = SystemDataType.RPC;
048    
049                    Serializer serializer = new Serializer();
050    
051                    serializer.writeObject(processCallable);
052    
053                    try {
054                            Datagram datagram = Datagram.createRequestDatagram(
055                                    systemDataType.getValue(), serializer.toByteBuffer());
056    
057                            FutureResult<V> futureResult = new FutureResult<V>();
058    
059                            intraband.sendDatagram(
060                                    registrationReference, datagram, null, repliedEnumSet,
061                                    new FutureCompletionHandler<V>(futureResult));
062    
063                            return futureResult;
064                    }
065                    catch (Exception e) {
066                            throw new IntrabandRPCException(e);
067                    }
068            }
069    
070            protected static Callable<Serializable> emptyCallable =
071                    new Callable<Serializable>() {
072    
073                    @Override
074                    public Serializable call() throws Exception {
075                            return null;
076                    }
077    
078            };
079    
080            protected static EnumSet<CompletionType> repliedEnumSet = EnumSet.of(
081                    CompletionType.REPLIED);
082    
083            protected static class FutureCompletionHandler<V extends Serializable>
084                    implements CompletionHandler<Object> {
085    
086                    protected FutureCompletionHandler(FutureResult<V> futureResult) {
087                            _futureResult = futureResult;
088                    }
089    
090                    @Override
091                    public void delivered(Object attachment) {
092                    }
093    
094                    @Override
095                    public void failed(Object attachment, IOException ioe) {
096                            _futureResult.setException(ioe);
097                    }
098    
099                    @Override
100                    public void replied(Object attachment, Datagram datagram) {
101                            Deserializer deserializer = new Deserializer(
102                                    datagram.getDataByteBuffer());
103    
104                            try {
105                                    V v = deserializer.readObject();
106    
107                                    _futureResult.set(v);
108                            }
109                            catch (ClassNotFoundException cnfe) {
110                                    _futureResult.setException(cnfe);
111                            }
112                    }
113    
114                    @Override
115                    public void submitted(Object attachment) {
116                    }
117    
118                    @Override
119                    public void timedOut(Object attachment) {
120                            _futureResult.cancel(true);
121                    }
122    
123                    private final FutureResult<V> _futureResult;
124    
125            }
126    
127            protected static class FutureResult<V extends Serializable>
128                    extends FutureTask<V> {
129    
130                    protected FutureResult() {
131                            super((Callable<V>)emptyCallable);
132                    }
133    
134                    @Override
135                    protected void set(V v) {
136                            super.set(v);
137                    }
138    
139                    @Override
140                    protected void setException(Throwable t) {
141                            super.setException(t);
142                    }
143    
144            }
145    
146    }