001
014
015 package com.liferay.util.transport;
016
017 import java.io.IOException;
018
019 import java.net.DatagramPacket;
020 import java.net.InetAddress;
021 import java.net.MulticastSocket;
022
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025
026
035 public class MulticastTransport extends Thread implements Transport {
036
037 public MulticastTransport(DatagramHandler handler, String host, int port) {
038 super("MulticastListener-" + host + port);
039
040 setDaemon(true);
041 _handler = handler;
042 _host = host;
043 _port = port;
044 }
045
046 @Override
047 public synchronized void connect() throws IOException {
048 if (_socket == null) {
049 _socket = new MulticastSocket(_port);
050 }
051 else if (_socket.isConnected() && _socket.isBound()) {
052 return;
053 }
054
055 _address = InetAddress.getByName(_host);
056
057 _socket.joinGroup(_address);
058
059 _connected = true;
060
061 start();
062 }
063
064 @Override
065 public synchronized void disconnect() {
066
067
068
069 if (_address != null) {
070 try {
071 _socket.leaveGroup(_address);
072 _address = null;
073 }
074 catch (IOException ioe) {
075 _log.error("Unable to leave group", ioe);
076 }
077 }
078
079 _connected = false;
080
081 interrupt();
082
083 _socket.close();
084 }
085
086 @Override
087 public boolean isConnected() {
088 return _connected;
089 }
090
091 @Override
092 public void run() {
093 try {
094 while (_connected) {
095 _socket.receive(_inboundPacket);
096 _handler.process(_inboundPacket);
097 }
098 }
099 catch (IOException ioe) {
100 _log.error("Unable to process ", ioe);
101
102 _socket.disconnect();
103
104 _connected = false;
105
106 _handler.errorReceived(ioe);
107 }
108 }
109
110 public synchronized void sendMessage(byte[] bytes) throws IOException {
111 _outboundPacket.setData(bytes);
112 _outboundPacket.setAddress(_address);
113 _outboundPacket.setPort(_port);
114
115 _socket.send(_outboundPacket);
116 }
117
118 @Override
119 public synchronized void sendMessage(String message) throws IOException {
120 sendMessage(message.getBytes());
121 }
122
123 private static Log _log = LogFactory.getLog(MulticastTransport.class);
124
125 private InetAddress _address;
126 private boolean _connected;
127 private DatagramHandler _handler;
128 private String _host;
129 private byte[] _inboundBuffer = new byte[4096];
130 private DatagramPacket _inboundPacket = new DatagramPacket(
131 _inboundBuffer, _inboundBuffer.length);
132 private byte[] _outboundBuffer = new byte[4096];
133 private DatagramPacket _outboundPacket = new DatagramPacket(
134 _outboundBuffer, _outboundBuffer.length);
135 private int _port;
136 private MulticastSocket _socket;
137
138 }