001
014
015 package com.liferay.util.transport;
016
017 import java.io.IOException;
018
019 import java.net.DatagramPacket;
020 import java.net.InetAddress;
021 import java.net.MulticastSocket;
022
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025
026
035 public class MulticastTransport extends Thread implements Transport {
036
037 public MulticastTransport(DatagramHandler handler, String host, int port) {
038 super("MulticastListener-" + host + port);
039
040 setDaemon(true);
041 _handler = handler;
042 _host = host;
043 _port = port;
044 }
045
046 @Override
047 public synchronized void connect() throws IOException {
048 if (_socket == null) {
049 _socket = new MulticastSocket(_port);
050 }
051 else if (_socket.isConnected() && _socket.isBound()) {
052 return;
053 }
054
055 _address = InetAddress.getByName(_host);
056
057 _socket.joinGroup(_address);
058
059 _connected = true;
060
061 start();
062 }
063
064 @Override
065 public synchronized void disconnect() {
066
067
068
069 if (_address != null) {
070 try {
071 _socket.leaveGroup(_address);
072 _address = null;
073 }
074 catch (IOException ioe) {
075 _log.error("Unable to leave group", ioe);
076 }
077 }
078
079 _connected = false;
080
081 interrupt();
082
083 _socket.close();
084 }
085
086 @Override
087 public boolean isConnected() {
088 return _connected;
089 }
090
091 @Override
092 public void run() {
093 try {
094 while (_connected) {
095 _socket.receive(_inboundPacket);
096 _handler.process(_inboundPacket);
097 }
098 }
099 catch (IOException ioe) {
100 if (!_connected) {
101 if (_log.isDebugEnabled()) {
102 _log.debug("Unable to disconnect", ioe);
103 }
104
105 return;
106 }
107
108 _log.error("Unable to process ", ioe);
109
110 _socket.disconnect();
111
112 _connected = false;
113
114 _handler.errorReceived(ioe);
115 }
116 }
117
118 public synchronized void sendMessage(byte[] bytes) throws IOException {
119 _outboundPacket.setData(bytes);
120 _outboundPacket.setAddress(_address);
121 _outboundPacket.setPort(_port);
122
123 _socket.send(_outboundPacket);
124 }
125
126 @Override
127 public synchronized void sendMessage(String message) throws IOException {
128 sendMessage(message.getBytes());
129 }
130
131 private static Log _log = LogFactory.getLog(MulticastTransport.class);
132
133 private InetAddress _address;
134 private boolean _connected;
135 private DatagramHandler _handler;
136 private String _host;
137 private byte[] _inboundBuffer = new byte[4096];
138 private DatagramPacket _inboundPacket = new DatagramPacket(
139 _inboundBuffer, _inboundBuffer.length);
140 private byte[] _outboundBuffer = new byte[4096];
141 private DatagramPacket _outboundPacket = new DatagramPacket(
142 _outboundBuffer, _outboundBuffer.length);
143 private int _port;
144 private MulticastSocket _socket;
145
146 }