001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.io.Deserializer;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.io.InputStream;
022 import java.io.OutputStream;
023
024 import java.nio.ByteBuffer;
025
026 import java.util.concurrent.CountDownLatch;
027
028 import org.jgroups.Address;
029 import org.jgroups.Message;
030 import org.jgroups.Receiver;
031 import org.jgroups.View;
032
033
036 public abstract class BaseReceiver implements Receiver {
037
038 @Override
039 public void block() {
040 }
041
042 @Override
043 public void getState(OutputStream outputStream) throws Exception {
044 }
045
046 public View getView() {
047 return _view;
048 }
049
050 public void openLatch() {
051 _countDownLatch.countDown();
052 }
053
054 @Override
055 public void receive(Message message) {
056 try {
057 _countDownLatch.await();
058 }
059 catch (InterruptedException ie) {
060 _log.error(
061 "Latch opened prematurely by interruption. Dependence may " +
062 "not be ready.");
063 }
064
065 doReceive(message);
066 }
067
068 @Override
069 public void setState(InputStream inputStream) throws Exception {
070 }
071
072 @Override
073 public void suspect(Address address) {
074 }
075
076 @Override
077 public void unblock() {
078 }
079
080 @Override
081 public void viewAccepted(View view) {
082 if (_log.isInfoEnabled()) {
083 _log.info("Accepted view " + view);
084 }
085
086 if (_view == null) {
087 _view = view;
088
089 return;
090 }
091
092 try {
093 _countDownLatch.await();
094 }
095 catch (InterruptedException ie) {
096 _log.error(
097 "Latch opened prematurely by interruption. Dependence may " +
098 "not be ready.");
099 }
100
101 View oldView = _view;
102
103 _view = view;
104
105 doViewAccepted(oldView, view);
106 }
107
108 protected abstract void doReceive(Message message);
109
110 protected void doViewAccepted(View oldView, View newView) {
111 }
112
113 protected Object retrievePayload(Message message) {
114 byte[] rawBuffer = message.getRawBuffer();
115
116 if (rawBuffer == null) {
117 if (_log.isWarnEnabled()) {
118 _log.warn("Message content is null");
119 }
120
121 return null;
122 }
123
124 ByteBuffer byteBuffer = ByteBuffer.wrap(
125 rawBuffer, message.getOffset(), message.getLength());
126
127 Deserializer deserializer = new Deserializer(byteBuffer.slice());
128
129 try {
130 return deserializer.readObject();
131 }
132 catch (ClassNotFoundException cnfe) {
133 if (_log.isWarnEnabled()) {
134 _log.warn("Unable to deserialize message payload", cnfe);
135 }
136 }
137
138 return null;
139 }
140
141 private static Log _log = LogFactoryUtil.getLog(BaseReceiver.class);
142
143 private final CountDownLatch _countDownLatch = new CountDownLatch(1);
144 private volatile View _view;
145
146 }