001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
019 import com.liferay.portal.kernel.cluster.ClusterRequest;
020 import com.liferay.portal.kernel.notifications.Channel;
021 import com.liferay.portal.kernel.notifications.ChannelException;
022 import com.liferay.portal.kernel.notifications.ChannelHub;
023 import com.liferay.portal.kernel.notifications.ChannelHubManager;
024 import com.liferay.portal.kernel.notifications.ChannelHubManagerUtil;
025 import com.liferay.portal.kernel.notifications.ChannelListener;
026 import com.liferay.portal.kernel.notifications.DuplicateChannelHubException;
027 import com.liferay.portal.kernel.notifications.NotificationEvent;
028 import com.liferay.portal.kernel.notifications.UnknownChannelHubException;
029 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
030 import com.liferay.portal.kernel.util.MethodHandler;
031 import com.liferay.portal.kernel.util.MethodKey;
032
033 import java.util.Collection;
034 import java.util.Collections;
035 import java.util.List;
036 import java.util.concurrent.ConcurrentHashMap;
037 import java.util.concurrent.ConcurrentMap;
038
039
044 @DoPrivileged
045 public class ChannelHubManagerImpl implements ChannelHubManager {
046
047 @Override
048 public void confirmDelivery(
049 long companyId, long userId,
050 Collection<String> notificationEventUuids)
051 throws ChannelException {
052
053 confirmDelivery(companyId, userId, notificationEventUuids, false);
054 }
055
056 @Override
057 public void confirmDelivery(
058 long companyId, long userId,
059 Collection<String> notificationEventUuids, boolean archive)
060 throws ChannelException {
061
062 ChannelHub channelHub = getChannelHub(companyId);
063
064 channelHub.confirmDelivery(userId, notificationEventUuids, archive);
065 }
066
067 @Override
068 public void confirmDelivery(
069 long companyId, long userId, String notificationEventUuid)
070 throws ChannelException {
071
072 confirmDelivery(companyId, userId, notificationEventUuid, false);
073 }
074
075 @Override
076 public void confirmDelivery(
077 long companyId, long userId, String notificationEventUuid,
078 boolean archive)
079 throws ChannelException {
080
081 ChannelHub channelHub = getChannelHub(companyId);
082
083 channelHub.confirmDelivery(userId, notificationEventUuid, archive);
084 }
085
086 @Override
087 public Channel createChannel(long companyId, long userId)
088 throws ChannelException {
089
090 ChannelHub channelHub = getChannelHub(companyId);
091
092 return channelHub.createChannel(userId);
093 }
094
095 @Override
096 public ChannelHub createChannelHub(long companyId) throws ChannelException {
097 ChannelHub channelHub = _channelHub.clone(companyId);
098
099 if (_channelHubs.putIfAbsent(companyId, channelHub) != null) {
100 throw new DuplicateChannelHubException(
101 "Channel already exists with company id " + companyId);
102 }
103
104 return channelHub;
105 }
106
107 @Override
108 public void deleteUserNotificiationEvent(
109 long companyId, long userId, String notificationEventUuid)
110 throws ChannelException {
111
112 ChannelHub channelHub = getChannelHub(companyId);
113
114 channelHub.deleteUserNotificiationEvent(userId, notificationEventUuid);
115 }
116
117 @Override
118 public void deleteUserNotificiationEvents(
119 long companyId, long userId,
120 Collection<String> notificationEventUuids)
121 throws ChannelException {
122
123 ChannelHub channelHub = getChannelHub(companyId);
124
125 channelHub.deleteUserNotificiationEvents(
126 userId, notificationEventUuids);
127 }
128
129 @Override
130 public void destroyChannel(long companyId, long userId)
131 throws ChannelException {
132
133 ChannelHub channelHub = getChannelHub(companyId);
134
135 channelHub.destroyChannel(userId);
136
137 if (!ClusterInvokeThreadLocal.isEnabled()) {
138 return;
139 }
140
141 MethodHandler methodHandler = new MethodHandler(
142 _destroyChannelMethodKey, companyId, userId);
143
144 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
145 methodHandler, true);
146
147 try {
148 ClusterExecutorUtil.execute(clusterRequest);
149 }
150 catch (Exception e) {
151 throw new ChannelException(
152 "Unable to destroy channel across cluster", e);
153 }
154 }
155
156 @Override
157 public void destroyChannelHub(long companyId) throws ChannelException {
158 ChannelHub channelHub = _channelHubs.remove(companyId);
159
160 if (channelHub != null) {
161 channelHub.destroy();
162 }
163 }
164
165 @Override
166 public ChannelHub fetchChannelHub(long companyId) throws ChannelException {
167 return fetchChannelHub(companyId, false);
168 }
169
170 @Override
171 public ChannelHub fetchChannelHub(long companyId, boolean createIfAbsent)
172 throws ChannelException {
173
174 ChannelHub channelHub = _channelHubs.get(companyId);
175
176 if (channelHub == null) {
177 synchronized(_channelHubs) {
178 channelHub = _channelHubs.get(companyId);
179
180 if (channelHub == null) {
181 if (createIfAbsent) {
182 channelHub = createChannelHub(companyId);
183 }
184 }
185 }
186 }
187
188 return channelHub;
189 }
190
191 @Override
192 public List<NotificationEvent> fetchNotificationEvents(
193 long companyId, long userId, boolean flush)
194 throws ChannelException {
195
196 ChannelHub channelHub = fetchChannelHub(companyId);
197
198 if (channelHub == null) {
199 return Collections.emptyList();
200 }
201
202 return channelHub.fetchNotificationEvents(userId, flush);
203 }
204
205 @Override
206 public void flush() throws ChannelException {
207 for (ChannelHub channelHub : _channelHubs.values()) {
208 channelHub.flush();
209 }
210 }
211
212 @Override
213 public void flush(long companyId) throws ChannelException {
214 ChannelHub channelHub = fetchChannelHub(companyId);
215
216 if (channelHub != null) {
217 channelHub.flush();
218 }
219 }
220
221 @Override
222 public void flush(long companyId, long userId, long timestamp)
223 throws ChannelException {
224
225 ChannelHub channelHub = fetchChannelHub(companyId);
226
227 if (channelHub != null) {
228 channelHub.flush(userId, timestamp);
229 }
230 }
231
232 @Override
233 public Channel getChannel(long companyId, long userId)
234 throws ChannelException {
235
236 return getChannel(companyId, userId, false);
237 }
238
239 @Override
240 public Channel getChannel(
241 long companyId, long userId, boolean createIfAbsent)
242 throws ChannelException {
243
244 ChannelHub channelHub = getChannelHub(companyId, createIfAbsent);
245
246 return channelHub.getChannel(userId, createIfAbsent);
247 }
248
249 @Override
250 public ChannelHub getChannelHub(long companyId) throws ChannelException {
251 return getChannelHub(companyId, false);
252 }
253
254 @Override
255 public ChannelHub getChannelHub(long companyId, boolean createIfAbsent)
256 throws ChannelException {
257
258 ChannelHub channelHub = fetchChannelHub(companyId, createIfAbsent);
259
260 if (channelHub == null) {
261 throw new UnknownChannelHubException(
262 "No channel exists with company id " + companyId);
263 }
264
265 return channelHub;
266 }
267
268 @Override
269 public List<NotificationEvent> getNotificationEvents(
270 long companyId, long userId)
271 throws ChannelException {
272
273 ChannelHub channelHub = getChannelHub(companyId);
274
275 return channelHub.getNotificationEvents(userId);
276 }
277
278 @Override
279 public List<NotificationEvent> getNotificationEvents(
280 long companyId, long userId, boolean flush)
281 throws ChannelException {
282
283 ChannelHub channelHub = getChannelHub(companyId);
284
285 return channelHub.getNotificationEvents(userId, flush);
286 }
287
288 @Override
289 public Collection<Long> getUserIds(long companyId) throws ChannelException {
290 ChannelHub channelHub = getChannelHub(companyId);
291
292 return channelHub.getUserIds();
293 }
294
295 @Override
296 public void registerChannelListener(
297 long companyId, long userId, ChannelListener channelListener)
298 throws ChannelException {
299
300 ChannelHub channelHub = getChannelHub(companyId);
301
302 channelHub.registerChannelListener(userId, channelListener);
303 }
304
305 @Override
306 public void removeTransientNotificationEvents(
307 long companyId, long userId,
308 Collection<NotificationEvent> notificationEvents)
309 throws ChannelException {
310
311 ChannelHub channelHub = getChannelHub(companyId);
312
313 channelHub.removeTransientNotificationEvents(
314 userId, notificationEvents);
315 }
316
317 @Override
318 public void removeTransientNotificationEventsByUuid(
319 long companyId, long userId,
320 Collection<String> notificationEventUuids)
321 throws ChannelException {
322
323 ChannelHub channelHub = getChannelHub(companyId);
324
325 channelHub.removeTransientNotificationEventsByUuid(
326 userId, notificationEventUuids);
327 }
328
329 @Override
330 public void sendNotificationEvent(
331 long companyId, long userId, NotificationEvent notificationEvent)
332 throws ChannelException {
333
334 ChannelHub channelHub = getChannelHub(companyId);
335
336 channelHub.sendNotificationEvent(userId, notificationEvent);
337
338 if (!ClusterInvokeThreadLocal.isEnabled()) {
339 return;
340 }
341
342 MethodHandler methodHandler = new MethodHandler(
343 _storeNotificationEventMethodKey, companyId, userId,
344 notificationEvent);
345
346 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
347 methodHandler, true);
348
349 try {
350 ClusterExecutorUtil.execute(clusterRequest);
351 }
352 catch (Exception e) {
353 throw new ChannelException("Unable to notify cluster of event", e);
354 }
355 }
356
357 @Override
358 public void sendNotificationEvents(
359 long companyId, long userId,
360 Collection<NotificationEvent> notificationEvents)
361 throws ChannelException {
362
363 ChannelHub channelHub = getChannelHub(companyId);
364
365 channelHub.sendNotificationEvents(userId, notificationEvents);
366 }
367
368 public void setChannelHubPrototype(ChannelHub channelHub) {
369 _channelHub = channelHub;
370 }
371
372 @Override
373 public void storeNotificationEvent(
374 long companyId, long userId, NotificationEvent notificationEvent)
375 throws ChannelException {
376
377 ChannelHub channelHub = getChannelHub(companyId);
378
379 channelHub.storeNotificationEvent(userId, notificationEvent);
380 }
381
382 @Override
383 public void unregisterChannelListener(
384 long companyId, long userId, ChannelListener channelListener)
385 throws ChannelException {
386
387 ChannelHub channelHub = getChannelHub(companyId);
388
389 channelHub.unregisterChannelListener(userId, channelListener);
390 }
391
392 private static final MethodKey _storeNotificationEventMethodKey =
393 new MethodKey(
394 ChannelHubManagerUtil.class, "storeNotificationEvent", long.class,
395 long.class, NotificationEvent.class);
396
397 private static final MethodKey _destroyChannelMethodKey =
398 new MethodKey(
399 ChannelHubManagerUtil.class, "destroyChannel", long.class,
400 long.class);
401
402 private ChannelHub _channelHub;
403 private ConcurrentMap<Long, ChannelHub> _channelHubs =
404 new ConcurrentHashMap<Long, ChannelHub>();
405
406 }