001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.notifications;
016    
017    import com.liferay.portal.kernel.notifications.Channel;
018    import com.liferay.portal.kernel.notifications.ChannelException;
019    import com.liferay.portal.kernel.notifications.ChannelHub;
020    import com.liferay.portal.kernel.notifications.ChannelListener;
021    import com.liferay.portal.kernel.notifications.NotificationEvent;
022    import com.liferay.portal.kernel.notifications.UnknownChannelException;
023    import com.liferay.portal.model.CompanyConstants;
024    import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
025    import com.liferay.portal.util.PropsValues;
026    
027    import java.util.ArrayList;
028    import java.util.Collection;
029    import java.util.Collections;
030    import java.util.Iterator;
031    import java.util.List;
032    import java.util.Map;
033    import java.util.Set;
034    import java.util.concurrent.ConcurrentHashMap;
035    import java.util.concurrent.ConcurrentMap;
036    
037    /**
038     * @author Edward Han
039     * @author Brian Wing Shun Chan
040     * @author Shuyang Zhou
041     */
042    public class ChannelHubImpl implements ChannelHub {
043    
044            @Override
045            public void cleanUp() throws ChannelException {
046                    for (Channel channel : _channels.values()) {
047                            channel.cleanUp();
048                    }
049            }
050    
051            @Override
052            public void cleanUp(long userId) throws ChannelException {
053                    Channel channel = getChannel(userId);
054    
055                    channel.cleanUp();
056            }
057    
058            @Override
059            public ChannelHub clone(long companyId) {
060                    ChannelHubImpl channelHubImpl = new ChannelHubImpl();
061    
062                    channelHubImpl.setChannelPrototype(_channel);
063                    channelHubImpl.setCompanyId(companyId);
064    
065                    return channelHubImpl;
066            }
067    
068            @Override
069            public void confirmDelivery(
070                            long userId, Collection<String> notificationEventUuids)
071                    throws ChannelException {
072    
073                    confirmDelivery(userId, notificationEventUuids, false);
074            }
075    
076            @Override
077            public void confirmDelivery(
078                            long userId, Collection<String> notificationEventUuids,
079                            boolean archive)
080                    throws ChannelException {
081    
082                    Channel channel = getChannel(userId);
083    
084                    channel.confirmDelivery(notificationEventUuids, archive);
085            }
086    
087            @Override
088            public void confirmDelivery(long userId, String notificationEventUuid)
089                    throws ChannelException {
090    
091                    confirmDelivery(userId, notificationEventUuid, false);
092            }
093    
094            @Override
095            public void confirmDelivery(
096                            long userId, String notificationEventUuid, boolean archive)
097                    throws ChannelException {
098    
099                    Channel channel = getChannel(userId);
100    
101                    channel.confirmDelivery(notificationEventUuid, archive);
102            }
103    
104            @Override
105            public Channel createChannel(long userId) throws ChannelException {
106                    if (_channels.containsKey(userId)) {
107                            return _channels.get(userId);
108                    }
109    
110                    Channel channel = _channel.clone(_companyId, userId);
111    
112                    Channel oldChannel = _channels.putIfAbsent(userId, channel);
113    
114                    if (oldChannel != null) {
115                            channel.sendNotificationEvents(oldChannel.getNotificationEvents());
116                    }
117    
118                    channel.init();
119    
120                    return channel;
121            }
122    
123            @Override
124            public void deleteUserNotificiationEvent(
125                            long userId, String notificationEventUuid)
126                    throws ChannelException {
127    
128                    Channel channel = getChannel(userId);
129    
130                    channel.deleteUserNotificiationEvent(notificationEventUuid);
131            }
132    
133            @Override
134            public void deleteUserNotificiationEvents(
135                            long userId, Collection<String> notificationEventUuids)
136                    throws ChannelException {
137    
138                    Channel channel = getChannel(userId);
139    
140                    channel.deleteUserNotificiationEvents(notificationEventUuids);
141            }
142    
143            @Override
144            public void destroy() throws ChannelException {
145                    Set<Map.Entry<Long, Channel>> channels = _channels.entrySet();
146    
147                    Iterator<Map.Entry<Long, Channel>> itr = channels.iterator();
148    
149                    while (itr.hasNext()) {
150                            Channel channel = itr.next().getValue();
151    
152                            channel.close();
153    
154                            itr.remove();
155                    }
156            }
157    
158            @Override
159            public Channel destroyChannel(long userId) throws ChannelException {
160                    Channel channel = _channels.remove(userId);
161    
162                    if (channel != null) {
163                            channel.close();
164                    }
165    
166                    return channel;
167            }
168    
169            @Override
170            public Channel fetchChannel(long userId) throws ChannelException {
171                    return fetchChannel(userId, false);
172            }
173    
174            @Override
175            public Channel fetchChannel(long userId, boolean createIfAbsent)
176                    throws ChannelException {
177    
178                    Channel channel = _channels.get(userId);
179    
180                    if (channel == null) {
181                            synchronized (_channels) {
182                                    channel = _channels.get(userId);
183    
184                                    if (channel == null) {
185                                            if (createIfAbsent) {
186                                                    channel = createChannel(userId);
187                                            }
188                                    }
189                            }
190                    }
191    
192                    return channel;
193            }
194    
195            @Override
196            public List<NotificationEvent> fetchNotificationEvents(long userId)
197                    throws ChannelException {
198    
199                    return fetchNotificationEvents(userId, false);
200            }
201    
202            @Override
203            public List<NotificationEvent> fetchNotificationEvents(
204                            long userId, boolean flush)
205                    throws ChannelException {
206    
207                    Channel channel = fetchChannel(userId);
208    
209                    if (channel == null) {
210                            return Collections.emptyList();
211                    }
212    
213                    return channel.getNotificationEvents(flush);
214            }
215    
216            @Override
217            public void flush() throws ChannelException {
218                    for (Channel channel : _channels.values()) {
219                            channel.flush();
220                    }
221            }
222    
223            @Override
224            public void flush(long userId) throws ChannelException {
225                    Channel channel = fetchChannel(userId);
226    
227                    if (channel != null) {
228                            channel.flush();
229                    }
230            }
231    
232            @Override
233            public void flush(long userId, long timestamp) throws ChannelException {
234                    Channel channel = fetchChannel(userId);
235    
236                    if (channel != null) {
237                            channel.flush(timestamp);
238                    }
239            }
240    
241            @Override
242            public Channel getChannel(long userId) throws ChannelException {
243                    return getChannel(userId, false);
244            }
245    
246            @Override
247            public Channel getChannel(long userId, boolean createIfAbsent)
248                    throws ChannelException {
249    
250                    Channel channel = fetchChannel(userId, createIfAbsent);
251    
252                    if (channel == null) {
253                            throw new UnknownChannelException(
254                                    "No channel exists with user id " + userId);
255                    }
256    
257                    return channel;
258            }
259    
260            public long getCompanyId() {
261                    return _companyId;
262            }
263    
264            @Override
265            public List<NotificationEvent> getNotificationEvents(long userId)
266                    throws ChannelException {
267    
268                    return getNotificationEvents(userId, false);
269            }
270    
271            @Override
272            public List<NotificationEvent> getNotificationEvents(
273                            long userId, boolean flush)
274                    throws ChannelException {
275    
276                    Channel channel = getChannel(userId, false);
277    
278                    return channel.getNotificationEvents(flush);
279            }
280    
281            @Override
282            public Collection<Long> getUserIds() {
283                    return Collections.unmodifiableSet(_channels.keySet());
284            }
285    
286            @Override
287            public void registerChannelListener(
288                            long userId, ChannelListener channelListener)
289                    throws ChannelException {
290    
291                    Channel channel = getChannel(userId);
292    
293                    channel.registerChannelListener(channelListener);
294            }
295    
296            @Override
297            public void removeTransientNotificationEvents(
298                            long userId, Collection<NotificationEvent> notificationEvents)
299                    throws ChannelException {
300    
301                    Channel channel = fetchChannel(userId);
302    
303                    if (channel != null) {
304                            channel.removeTransientNotificationEvents(notificationEvents);
305                    }
306            }
307    
308            @Override
309            public void removeTransientNotificationEventsByUuid(
310                            long userId, Collection<String> notificationEventUuids)
311                    throws ChannelException {
312    
313                    Channel channel = fetchChannel(userId);
314    
315                    if (channel != null) {
316                            channel.removeTransientNotificationEventsByUuid(
317                                    notificationEventUuids);
318                    }
319            }
320    
321            @Override
322            public void sendNotificationEvent(
323                            long userId, NotificationEvent notificationEvent)
324                    throws ChannelException {
325    
326                    Channel channel = fetchChannel(userId);
327    
328                    if (channel != null) {
329                            channel.sendNotificationEvent(notificationEvent);
330    
331                            return;
332                    }
333    
334                    if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED ||
335                            !notificationEvent.isDeliveryRequired()) {
336    
337                            return;
338                    }
339    
340                    try {
341                            UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
342                                    userId, notificationEvent);
343                    }
344                    catch (Exception e) {
345                            throw new ChannelException("Unable to send event", e);
346                    }
347            }
348    
349            @Override
350            public void sendNotificationEvents(
351                            long userId, Collection<NotificationEvent> notificationEvents)
352                    throws ChannelException {
353    
354                    Channel channel = fetchChannel(userId);
355    
356                    if (channel != null) {
357                            channel.sendNotificationEvents(notificationEvents);
358    
359                            return;
360                    }
361    
362                    if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
363                            return;
364                    }
365    
366                    List<NotificationEvent> persistedNotificationEvents =
367                                    new ArrayList<NotificationEvent>(notificationEvents.size());
368    
369                    for (NotificationEvent notificationEvent : notificationEvents) {
370                            if (notificationEvent.isDeliveryRequired()) {
371                                    persistedNotificationEvents.add(notificationEvent);
372                            }
373                    }
374    
375                    try {
376                            UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
377                                    userId, persistedNotificationEvents);
378                    }
379                    catch (Exception e) {
380                            throw new ChannelException("Unable to send events", e);
381                    }
382            }
383    
384            public void setChannelPrototype(Channel channel) {
385                    _channel = channel;
386            }
387    
388            public void setCompanyId(long companyId) {
389                    _companyId = companyId;
390            }
391    
392            @Override
393            public void unregisterChannelListener(
394                            long userId, ChannelListener channelListener)
395                    throws ChannelException {
396    
397                    Channel channel = getChannel(userId);
398    
399                    channel.unregisterChannelListener(channelListener);
400            }
401    
402            private Channel _channel;
403            private ConcurrentMap<Long, Channel> _channels =
404                    new ConcurrentHashMap<Long, Channel>();
405            private long _companyId = CompanyConstants.SYSTEM;
406    
407    }