001
014
015 package com.liferay.util.transport;
016
017 import java.io.IOException;
018
019 import java.net.DatagramPacket;
020 import java.net.InetAddress;
021 import java.net.MulticastSocket;
022
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025
026
035 public class MulticastTransport extends Thread implements Transport {
036
037 public MulticastTransport(DatagramHandler handler, String host, int port) {
038 super("MulticastListener-" + host + port);
039
040 setDaemon(true);
041 _handler = handler;
042 _host = host;
043 _port = port;
044 }
045
046 public synchronized void connect() throws IOException {
047 if (_socket == null) {
048 _socket = new MulticastSocket(_port);
049 }
050 else if (_socket.isConnected() && _socket.isBound()) {
051 return;
052 }
053
054 _address = InetAddress.getByName(_host);
055
056 _socket.joinGroup(_address);
057
058 _connected = true;
059
060 start();
061 }
062
063 public synchronized void disconnect() {
064
065
066
067 if (_address != null) {
068 try {
069 _socket.leaveGroup(_address);
070 _address = null;
071 }
072 catch (IOException e) {
073 _log.error("Unable to leave group", e);
074 }
075 }
076
077 _connected = false;
078
079 interrupt();
080
081 _socket.close();
082 }
083
084 public synchronized void sendMessage(String msg) throws IOException {
085 _outboundPacket.setData(msg.getBytes());
086 _outboundPacket.setAddress(_address);
087 _outboundPacket.setPort(_port);
088
089 _socket.send(_outboundPacket);
090 }
091
092 public boolean isConnected() {
093 return _connected;
094 }
095
096 public void run() {
097 try {
098 while (_connected) {
099 _socket.receive(_inboundPacket);
100 _handler.process(_inboundPacket);
101 }
102 }
103 catch (IOException e) {
104 _log.error("Unable to process ", e);
105
106 _socket.disconnect();
107
108 _connected = false;
109
110 _handler.errorReceived(e);
111 }
112 }
113
114 private static Log _log = LogFactory.getLog(MulticastTransport.class);
115
116 private byte[] _inboundBuffer = new byte[4096];
117 private DatagramPacket _inboundPacket =
118 new DatagramPacket(_inboundBuffer, _inboundBuffer.length);
119 private byte[] _outboundBuffer = new byte[4096];
120 private DatagramPacket _outboundPacket =
121 new DatagramPacket(_outboundBuffer, _outboundBuffer.length);
122 private String _host;
123 private DatagramHandler _handler;
124 private int _port;
125 private boolean _connected;
126 private MulticastSocket _socket;
127 private InetAddress _address;
128
129 }