001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.notifications;
016    
017    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
019    import com.liferay.portal.kernel.cluster.ClusterRequest;
020    import com.liferay.portal.kernel.notifications.Channel;
021    import com.liferay.portal.kernel.notifications.ChannelException;
022    import com.liferay.portal.kernel.notifications.ChannelHub;
023    import com.liferay.portal.kernel.notifications.ChannelHubManager;
024    import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
025    import com.liferay.portal.kernel.notifications.ChannelListener;
026    import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
027    import com.liferay.portal.kernel.notifications.NotificationEvent;
028    import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
029    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
030    import com.liferay.portal.kernel.util.MethodHandler;
031    import com.liferay.portal.kernel.util.MethodKey;
032    
033    import java.util.Collection;
034    import java.util.Collections;
035    import java.util.List;
036    import java.util.concurrent.ConcurrentHashMap;
037    import java.util.concurrent.ConcurrentMap;
038    
039    /**
040     * @author Edward Han
041     * @author Brian Wing Shun
042     * @author Shuyang Zhou
043     */
044    @DoPrivileged
045    public class ChannelHubManagerImpl implements ChannelHubManager {
046    
047            @Override
048            public void confirmDelivery(
049                            long companyId, long userId,
050                            Collection<String> notificationEventUuids)
051                    throws ChannelException {
052    
053                    confirmDelivery(companyId, userId, notificationEventUuids, false);
054            }
055    
056            @Override
057            public void confirmDelivery(
058                            long companyId, long userId,
059                            Collection<String> notificationEventUuids, boolean archive)
060                    throws ChannelException {
061    
062                    ChannelHub channelHub = getChannelHub(companyId);
063    
064                    channelHub.confirmDelivery(userId, notificationEventUuids, archive);
065            }
066    
067            @Override
068            public void confirmDelivery(
069                            long companyId, long userId, String notificationEventUuid)
070                    throws ChannelException {
071    
072                    confirmDelivery(companyId, userId, notificationEventUuid, false);
073            }
074    
075            @Override
076            public void confirmDelivery(
077                            long companyId, long userId, String notificationEventUuid,
078                            boolean archive)
079                    throws ChannelException {
080    
081                    ChannelHub channelHub = getChannelHub(companyId);
082    
083                    channelHub.confirmDelivery(userId, notificationEventUuid, archive);
084            }
085    
086            @Override
087            public Channel createChannel(long companyId, long userId)
088                    throws ChannelException {
089    
090                    ChannelHub channelHub = getChannelHub(companyId);
091    
092                    return channelHub.createChannel(userId);
093            }
094    
095            @Override
096            public ChannelHub createChannelHub(long companyId) throws ChannelException {
097                    ChannelHub channelHub = _channelHub.clone(companyId);
098    
099                    if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
100                            throw new DuplicateChannelHubException(
101                                    "Channel already exists with company id " + companyId);
102                    }
103    
104                    return channelHub;
105            }
106    
107            @Override
108            public void deleteUserNotificiationEvent(
109                            long companyId, long userId, String notificationEventUuid)
110                    throws ChannelException {
111    
112                    ChannelHub channelHub = getChannelHub(companyId);
113    
114                    channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
115            }
116    
117            @Override
118            public void deleteUserNotificiationEvents(
119                            long companyId, long userId,
120                            Collection<String> notificationEventUuids)
121                    throws ChannelException {
122    
123                    ChannelHub channelHub = getChannelHub(companyId);
124    
125                    channelHub.deleteUserNotificiationEvents(
126                            userId, notificationEventUuids);
127            }
128    
129            @Override
130            public void destroyChannel(long companyId, long userId)
131                    throws ChannelException {
132    
133                    ChannelHub channelHub = getChannelHub(companyId);
134    
135                    channelHub.destroyChannel(userId);
136    
137                    if (!ClusterInvokeThreadLocal.isEnabled()) {
138                            return;
139                    }
140    
141                    MethodHandler methodHandler = new MethodHandler(
142                            _destroyChannelMethodKey, companyId, userId);
143    
144                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
145                            methodHandler, true);
146    
147                    try {
148                            ClusterExecutorUtil.execute(clusterRequest);
149                    }
150                    catch (Exception e) {
151                            throw new ChannelException(
152                                    "Unable to destroy channel across cluster", e);
153                    }
154            }
155    
156            @Override
157            public void destroyChannelHub(long companyId) throws ChannelException {
158                    ChannelHub channelHub = _channelHubs.remove(companyId);
159    
160                    if (channelHub != null) {
161                            channelHub.destroy();
162                    }
163            }
164    
165            @Override
166            public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
167                    return fetchChannelHub(companyId, false);
168            }
169    
170            @Override
171            public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
172                    throws ChannelException {
173    
174                    ChannelHub channelHub = _channelHubs.get(companyId);
175    
176                    if (channelHub == null) {
177                            synchronized(_channelHubs) {
178                                    channelHub = _channelHubs.get(companyId);
179    
180                                    if (channelHub == null) {
181                                            if (createIfAbsent) {
182                                                    channelHub = createChannelHub(companyId);
183                                            }
184                                    }
185                            }
186                    }
187    
188                    return channelHub;
189            }
190    
191            @Override
192            public List<NotificationEvent> fetchNotificationEvents(
193                            long companyId, long userId, boolean flush)
194                    throws ChannelException {
195    
196                    ChannelHub channelHub = fetchChannelHub(companyId);
197    
198                    if (channelHub == null) {
199                            return Collections.emptyList();
200                    }
201    
202                    return channelHub.fetchNotificationEvents(userId, flush);
203            }
204    
205            @Override
206            public void flush() throws ChannelException {
207                    for (ChannelHub channelHub : _channelHubs.values()) {
208                            channelHub.flush();
209                    }
210            }
211    
212            @Override
213            public void flush(long companyId) throws ChannelException {
214                    ChannelHub channelHub = fetchChannelHub(companyId);
215    
216                    if (channelHub != null) {
217                            channelHub.flush();
218                    }
219            }
220    
221            @Override
222            public void flush(long companyId, long userId, long timestamp)
223                    throws ChannelException {
224    
225                    ChannelHub channelHub = fetchChannelHub(companyId);
226    
227                    if (channelHub != null) {
228                            channelHub.flush(userId, timestamp);
229                    }
230            }
231    
232            @Override
233            public Channel getChannel(long companyId, long userId)
234                    throws ChannelException {
235    
236                    return getChannel(companyId, userId, false);
237            }
238    
239            @Override
240            public Channel getChannel(
241                            long companyId, long userId, boolean createIfAbsent)
242                    throws ChannelException {
243    
244                    ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
245    
246                    return channelHub.getChannel(userId, createIfAbsent);
247            }
248    
249            @Override
250            public ChannelHub getChannelHub(long companyId) throws ChannelException {
251                    return getChannelHub(companyId, false);
252            }
253    
254            @Override
255            public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
256                    throws ChannelException {
257    
258                    ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
259    
260                    if (channelHub == null) {
261                            throw new UnknownChannelHubException(
262                                    "No channel exists with company id " + companyId);
263                    }
264    
265                    return channelHub;
266            }
267    
268            @Override
269            public List<NotificationEvent> getNotificationEvents(
270                            long companyId, long userId)
271                    throws ChannelException {
272    
273                    ChannelHub channelHub = getChannelHub(companyId);
274    
275                    return channelHub.getNotificationEvents(userId);
276            }
277    
278            @Override
279            public List<NotificationEvent> getNotificationEvents(
280                            long companyId, long userId, boolean flush)
281                    throws ChannelException {
282    
283                    ChannelHub channelHub = getChannelHub(companyId);
284    
285                    return channelHub.getNotificationEvents(userId, flush);
286            }
287    
288            @Override
289            public Collection<Long> getUserIds(long companyId) throws ChannelException {
290                    ChannelHub channelHub = getChannelHub(companyId);
291    
292                    return channelHub.getUserIds();
293            }
294    
295            @Override
296            public void registerChannelListener(
297                            long companyId, long userId, ChannelListener channelListener)
298                    throws ChannelException {
299    
300                    ChannelHub channelHub = getChannelHub(companyId);
301    
302                    channelHub.registerChannelListener(userId, channelListener);
303            }
304    
305            @Override
306            public void removeTransientNotificationEvents(
307                            long companyId, long userId,
308                            Collection<NotificationEvent> notificationEvents)
309                    throws ChannelException {
310    
311                    ChannelHub channelHub = getChannelHub(companyId);
312    
313                    channelHub.removeTransientNotificationEvents(
314                            userId, notificationEvents);
315            }
316    
317            @Override
318            public void removeTransientNotificationEventsByUuid(
319                            long companyId, long userId,
320                            Collection<String> notificationEventUuids)
321                    throws ChannelException {
322    
323                    ChannelHub channelHub = getChannelHub(companyId);
324    
325                    channelHub.removeTransientNotificationEventsByUuid(
326                            userId, notificationEventUuids);
327            }
328    
329            @Override
330            public void sendNotificationEvent(
331                            long companyId, long userId, NotificationEvent notificationEvent)
332                    throws ChannelException {
333    
334                    ChannelHub channelHub = getChannelHub(companyId);
335    
336                    channelHub.sendNotificationEvent(userId, notificationEvent);
337    
338                    if (!ClusterInvokeThreadLocal.isEnabled()) {
339                            return;
340                    }
341    
342                    MethodHandler methodHandler = new MethodHandler(
343                            _storeNotificationEventMethodKey, companyId, userId,
344                            notificationEvent);
345    
346                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
347                            methodHandler, true);
348    
349                    try {
350                            ClusterExecutorUtil.execute(clusterRequest);
351                    }
352                    catch (Exception e) {
353                            throw new ChannelException("Unable to notify cluster of event", e);
354                    }
355            }
356    
357            @Override
358            public void sendNotificationEvents(
359                            long companyId, long userId,
360                            Collection<NotificationEvent> notificationEvents)
361                    throws ChannelException {
362    
363                    ChannelHub channelHub = getChannelHub(companyId);
364    
365                    channelHub.sendNotificationEvents(userId, notificationEvents);
366            }
367    
368            public void setChannelHubPrototype(ChannelHub channelHub) {
369                    _channelHub = channelHub;
370            }
371    
372            @Override
373            public void storeNotificationEvent(
374                            long companyId, long userId, NotificationEvent notificationEvent)
375                    throws ChannelException {
376    
377                    ChannelHub channelHub = getChannelHub(companyId);
378    
379                    channelHub.storeNotificationEvent(userId, notificationEvent);
380            }
381    
382            @Override
383            public void unregisterChannelListener(
384                            long companyId, long userId, ChannelListener channelListener)
385                    throws ChannelException {
386    
387                    ChannelHub channelHub = getChannelHub(companyId);
388    
389                    channelHub.unregisterChannelListener(userId, channelListener);
390            }
391    
392            private static final MethodKey _storeNotificationEventMethodKey =
393                    new MethodKey(
394                            ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
395                            long.class, NotificationEvent.class);
396    
397            private static final MethodKey _destroyChannelMethodKey =
398                    new MethodKey(
399                            ChannelHubManagerUtil.class, "destroyChannel", long.class,
400                            long.class);
401    
402            private ChannelHub _channelHub;
403            private ConcurrentMap<Long, ChannelHub> _channelHubs =
404                    new ConcurrentHashMap<Long, ChannelHub>();
405    
406    }