001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.io.Deserializer;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    
021    import java.io.InputStream;
022    import java.io.OutputStream;
023    
024    import java.nio.ByteBuffer;
025    
026    import java.util.concurrent.CountDownLatch;
027    
028    import org.jgroups.Address;
029    import org.jgroups.Message;
030    import org.jgroups.Receiver;
031    import org.jgroups.View;
032    
033    /**
034     * @author Tina Tian
035     */
036    public abstract class BaseReceiver implements Receiver {
037    
038            @Override
039            public void block() {
040            }
041    
042            @Override
043            public void getState(OutputStream outputStream) throws Exception {
044            }
045    
046            public View getView() {
047                    return _view;
048            }
049    
050            public void openLatch() {
051                    _countDownLatch.countDown();
052            }
053    
054            @Override
055            public void receive(Message message) {
056                    try {
057                            _countDownLatch.await();
058                    }
059                    catch (InterruptedException ie) {
060                            _log.error(
061                                    "Latch opened prematurely by interruption. Dependence may " +
062                                            "not be ready.");
063                    }
064    
065                    doReceive(message);
066            }
067    
068            @Override
069            public void setState(InputStream inputStream) throws Exception {
070            }
071    
072            @Override
073            public void suspect(Address address) {
074            }
075    
076            @Override
077            public void unblock() {
078            }
079    
080            @Override
081            public void viewAccepted(View view) {
082                    if (_log.isInfoEnabled()) {
083                            _log.info("Accepted view " + view);
084                    }
085    
086                    if (_view == null) {
087                            _view = view;
088    
089                            return;
090                    }
091    
092                    try {
093                            _countDownLatch.await();
094                    }
095                    catch (InterruptedException ie) {
096                            _log.error(
097                                    "Latch opened prematurely by interruption. Dependence may " +
098                                            "not be ready.");
099                    }
100    
101                    View oldView = _view;
102    
103                    _view = view;
104    
105                    doViewAccepted(oldView, view);
106            }
107    
108            protected abstract void doReceive(Message message);
109    
110            protected void doViewAccepted(View oldView, View newView) {
111            }
112    
113            protected Object retrievePayload(Message message) {
114                    byte[] rawBuffer = message.getRawBuffer();
115    
116                    if (rawBuffer == null) {
117                            if (_log.isWarnEnabled()) {
118                                    _log.warn("Message content is null");
119                            }
120    
121                            return null;
122                    }
123    
124                    ByteBuffer byteBuffer = ByteBuffer.wrap(
125                            rawBuffer, message.getOffset(), message.getLength());
126    
127                    Deserializer deserializer = new Deserializer(byteBuffer.slice());
128    
129                    try {
130                            return deserializer.readObject();
131                    }
132                    catch (ClassNotFoundException cnfe) {
133                            if (_log.isWarnEnabled()) {
134                                    _log.warn("Unable to deserialize message payload", cnfe);
135                            }
136                    }
137    
138                    return null;
139            }
140    
141            private static Log _log = LogFactoryUtil.getLog(BaseReceiver.class);
142    
143            private final CountDownLatch _countDownLatch = new CountDownLatch(1);
144            private volatile View _view;
145    
146    }