001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.notifications.Channel;
018 import com.liferay.portal.kernel.notifications.ChannelException;
019 import com.liferay.portal.kernel.notifications.ChannelHub;
020 import com.liferay.portal.kernel.notifications.ChannelListener;
021 import com.liferay.portal.kernel.notifications.NotificationEvent;
022 import com.liferay.portal.kernel.notifications.UnknownChannelException;
023 import com.liferay.portal.model.CompanyConstants;
024 import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
025 import com.liferay.portal.util.PropsValues;
026
027 import java.util.ArrayList;
028 import java.util.Collection;
029 import java.util.Collections;
030 import java.util.Iterator;
031 import java.util.List;
032 import java.util.Map;
033 import java.util.Set;
034 import java.util.concurrent.ConcurrentHashMap;
035 import java.util.concurrent.ConcurrentMap;
036
037
042 public class ChannelHubImpl implements ChannelHub {
043
044 @Override
045 public void cleanUp() throws ChannelException {
046 for (Channel channel : _channels.values()) {
047 channel.cleanUp();
048 }
049 }
050
051 @Override
052 public void cleanUp(long userId) throws ChannelException {
053 Channel channel = getChannel(userId);
054
055 channel.cleanUp();
056 }
057
058 @Override
059 public ChannelHub clone(long companyId) {
060 ChannelHubImpl channelHubImpl = new ChannelHubImpl();
061
062 channelHubImpl.setChannelPrototype(_channel);
063 channelHubImpl.setCompanyId(companyId);
064
065 return channelHubImpl;
066 }
067
068 @Override
069 public void confirmDelivery(
070 long userId, Collection<String> notificationEventUuids)
071 throws ChannelException {
072
073 confirmDelivery(userId, notificationEventUuids, false);
074 }
075
076 @Override
077 public void confirmDelivery(
078 long userId, Collection<String> notificationEventUuids,
079 boolean archive)
080 throws ChannelException {
081
082 Channel channel = getChannel(userId);
083
084 channel.confirmDelivery(notificationEventUuids, archive);
085 }
086
087 @Override
088 public void confirmDelivery(long userId, String notificationEventUuid)
089 throws ChannelException {
090
091 confirmDelivery(userId, notificationEventUuid, false);
092 }
093
094 @Override
095 public void confirmDelivery(
096 long userId, String notificationEventUuid, boolean archive)
097 throws ChannelException {
098
099 Channel channel = getChannel(userId);
100
101 channel.confirmDelivery(notificationEventUuid, archive);
102 }
103
104 @Override
105 public Channel createChannel(long userId) throws ChannelException {
106 if (_channels.containsKey(userId)) {
107 return _channels.get(userId);
108 }
109
110 Channel channel = _channel.clone(_companyId, userId);
111
112 Channel oldChannel = _channels.putIfAbsent(userId, channel);
113
114 if (oldChannel != null) {
115 channel.sendNotificationEvents(oldChannel.getNotificationEvents());
116 }
117
118 channel.init();
119
120 return channel;
121 }
122
123 @Override
124 public void deleteUserNotificiationEvent(
125 long userId, String notificationEventUuid)
126 throws ChannelException {
127
128 Channel channel = getChannel(userId);
129
130 channel.deleteUserNotificiationEvent(notificationEventUuid);
131 }
132
133 @Override
134 public void deleteUserNotificiationEvents(
135 long userId, Collection<String> notificationEventUuids)
136 throws ChannelException {
137
138 Channel channel = getChannel(userId);
139
140 channel.deleteUserNotificiationEvents(notificationEventUuids);
141 }
142
143 @Override
144 public void destroy() throws ChannelException {
145 Set<Map.Entry<Long, Channel>> channels = _channels.entrySet();
146
147 Iterator<Map.Entry<Long, Channel>> itr = channels.iterator();
148
149 while (itr.hasNext()) {
150 Channel channel = itr.next().getValue();
151
152 channel.close();
153
154 itr.remove();
155 }
156 }
157
158 @Override
159 public Channel destroyChannel(long userId) throws ChannelException {
160 Channel channel = _channels.remove(userId);
161
162 if (channel != null) {
163 channel.close();
164 }
165
166 return channel;
167 }
168
169 @Override
170 public Channel fetchChannel(long userId) throws ChannelException {
171 return fetchChannel(userId, false);
172 }
173
174 @Override
175 public Channel fetchChannel(long userId, boolean createIfAbsent)
176 throws ChannelException {
177
178 Channel channel = _channels.get(userId);
179
180 if (channel == null) {
181 synchronized (_channels) {
182 channel = _channels.get(userId);
183
184 if (channel == null) {
185 if (createIfAbsent) {
186 channel = createChannel(userId);
187 }
188 }
189 }
190 }
191
192 return channel;
193 }
194
195 @Override
196 public List<NotificationEvent> fetchNotificationEvents(long userId)
197 throws ChannelException {
198
199 return fetchNotificationEvents(userId, false);
200 }
201
202 @Override
203 public List<NotificationEvent> fetchNotificationEvents(
204 long userId, boolean flush)
205 throws ChannelException {
206
207 Channel channel = fetchChannel(userId);
208
209 if (channel == null) {
210 return Collections.emptyList();
211 }
212
213 return channel.getNotificationEvents(flush);
214 }
215
216 @Override
217 public void flush() throws ChannelException {
218 for (Channel channel : _channels.values()) {
219 channel.flush();
220 }
221 }
222
223 @Override
224 public void flush(long userId) throws ChannelException {
225 Channel channel = fetchChannel(userId);
226
227 if (channel != null) {
228 channel.flush();
229 }
230 }
231
232 @Override
233 public void flush(long userId, long timestamp) throws ChannelException {
234 Channel channel = fetchChannel(userId);
235
236 if (channel != null) {
237 channel.flush(timestamp);
238 }
239 }
240
241 @Override
242 public Channel getChannel(long userId) throws ChannelException {
243 return getChannel(userId, false);
244 }
245
246 @Override
247 public Channel getChannel(long userId, boolean createIfAbsent)
248 throws ChannelException {
249
250 Channel channel = fetchChannel(userId, createIfAbsent);
251
252 if (channel == null) {
253 throw new UnknownChannelException(
254 "No channel exists with user id " + userId);
255 }
256
257 return channel;
258 }
259
260 public long getCompanyId() {
261 return _companyId;
262 }
263
264 @Override
265 public List<NotificationEvent> getNotificationEvents(long userId)
266 throws ChannelException {
267
268 return getNotificationEvents(userId, false);
269 }
270
271 @Override
272 public List<NotificationEvent> getNotificationEvents(
273 long userId, boolean flush)
274 throws ChannelException {
275
276 Channel channel = getChannel(userId, false);
277
278 return channel.getNotificationEvents(flush);
279 }
280
281 @Override
282 public Collection<Long> getUserIds() {
283 return Collections.unmodifiableSet(_channels.keySet());
284 }
285
286 @Override
287 public void registerChannelListener(
288 long userId, ChannelListener channelListener)
289 throws ChannelException {
290
291 Channel channel = getChannel(userId);
292
293 channel.registerChannelListener(channelListener);
294 }
295
296 @Override
297 public void removeTransientNotificationEvents(
298 long userId, Collection<NotificationEvent> notificationEvents)
299 throws ChannelException {
300
301 Channel channel = fetchChannel(userId);
302
303 if (channel != null) {
304 channel.removeTransientNotificationEvents(notificationEvents);
305 }
306 }
307
308 @Override
309 public void removeTransientNotificationEventsByUuid(
310 long userId, Collection<String> notificationEventUuids)
311 throws ChannelException {
312
313 Channel channel = fetchChannel(userId);
314
315 if (channel != null) {
316 channel.removeTransientNotificationEventsByUuid(
317 notificationEventUuids);
318 }
319 }
320
321 @Override
322 public void sendNotificationEvent(
323 long userId, NotificationEvent notificationEvent)
324 throws ChannelException {
325
326 Channel channel = fetchChannel(userId);
327
328 if (channel != null) {
329 channel.sendNotificationEvent(notificationEvent);
330
331 return;
332 }
333
334 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED ||
335 !notificationEvent.isDeliveryRequired()) {
336
337 return;
338 }
339
340 try {
341 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
342 userId, notificationEvent);
343 }
344 catch (Exception e) {
345 throw new ChannelException("Unable to send event", e);
346 }
347 }
348
349 @Override
350 public void sendNotificationEvents(
351 long userId, Collection<NotificationEvent> notificationEvents)
352 throws ChannelException {
353
354 Channel channel = fetchChannel(userId);
355
356 if (channel != null) {
357 channel.sendNotificationEvents(notificationEvents);
358
359 return;
360 }
361
362 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
363 return;
364 }
365
366 List<NotificationEvent> persistedNotificationEvents =
367 new ArrayList<NotificationEvent>(notificationEvents.size());
368
369 for (NotificationEvent notificationEvent : notificationEvents) {
370 if (notificationEvent.isDeliveryRequired()) {
371 persistedNotificationEvents.add(notificationEvent);
372 }
373 }
374
375 try {
376 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
377 userId, persistedNotificationEvents);
378 }
379 catch (Exception e) {
380 throw new ChannelException("Unable to send events", e);
381 }
382 }
383
384 public void setChannelPrototype(Channel channel) {
385 _channel = channel;
386 }
387
388 public void setCompanyId(long companyId) {
389 _companyId = companyId;
390 }
391
392 @Override
393 public void unregisterChannelListener(
394 long userId, ChannelListener channelListener)
395 throws ChannelException {
396
397 Channel channel = getChannel(userId);
398
399 channel.unregisterChannelListener(channelListener);
400 }
401
402 private Channel _channel;
403 private ConcurrentMap<Long, Channel> _channels =
404 new ConcurrentHashMap<Long, Channel>();
405 private long _companyId = CompanyConstants.SYSTEM;
406
407 }