001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONException;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.json.JSONObject;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.notifications.BaseChannelImpl;
024 import com.liferay.portal.kernel.notifications.Channel;
025 import com.liferay.portal.kernel.notifications.ChannelException;
026 import com.liferay.portal.kernel.notifications.NotificationEvent;
027 import com.liferay.portal.kernel.notifications.NotificationEventComparator;
028 import com.liferay.portal.kernel.notifications.NotificationEventFactoryUtil;
029 import com.liferay.portal.model.CompanyConstants;
030 import com.liferay.portal.model.UserNotificationEvent;
031 import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
032 import com.liferay.portal.util.PropsValues;
033
034 import java.util.ArrayList;
035 import java.util.Collection;
036 import java.util.Comparator;
037 import java.util.HashMap;
038 import java.util.HashSet;
039 import java.util.Iterator;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Set;
043 import java.util.TreeSet;
044 import java.util.concurrent.locks.ReentrantLock;
045
046
051 public class ChannelImpl extends BaseChannelImpl {
052
053 public ChannelImpl() {
054 this(CompanyConstants.SYSTEM, 0);
055 }
056
057 public ChannelImpl(long companyId, long usedId) {
058 super(companyId, usedId);
059 }
060
061 @Override
062 public Channel clone(long companyId, long userId) {
063 return new ChannelImpl(companyId, userId);
064 }
065
066 @Override
067 public void confirmDelivery(Collection<String> notificationEventUuids)
068 throws ChannelException {
069
070 confirmDelivery(notificationEventUuids, false);
071 }
072
073 @Override
074 public void confirmDelivery(
075 Collection<String> notificationEventUuids, boolean archive)
076 throws ChannelException {
077
078 _reentrantLock.lock();
079
080 try {
081 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
082 if (archive) {
083 UserNotificationEventLocalServiceUtil.
084 updateUserNotificationEvents(
085 notificationEventUuids, archive);
086 }
087 else {
088 UserNotificationEventLocalServiceUtil.
089 deleteUserNotificationEvents(notificationEventUuids);
090 }
091 }
092
093 for (String notificationEventUuid : notificationEventUuids) {
094 Map<String, NotificationEvent> unconfirmedNotificationEvents =
095 _getUnconfirmedNotificationEvents();
096
097 unconfirmedNotificationEvents.remove(notificationEventUuid);
098 }
099 }
100 catch (Exception e) {
101 throw new ChannelException(
102 "Unable to confirm delivery for user " + getUserId() , e);
103 }
104 finally {
105 _reentrantLock.unlock();
106 }
107 }
108
109 @Override
110 public void confirmDelivery(String notificationEventUuid)
111 throws ChannelException {
112
113 confirmDelivery(notificationEventUuid, false);
114 }
115
116 @Override
117 public void confirmDelivery(String notificationEventUuid, boolean archive)
118 throws ChannelException {
119
120 _reentrantLock.lock();
121
122 try {
123 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
124 if (archive) {
125 UserNotificationEventLocalServiceUtil.
126 updateUserNotificationEvent(
127 notificationEventUuid, archive);
128 }
129 else {
130 UserNotificationEventLocalServiceUtil.
131 deleteUserNotificationEvent(notificationEventUuid);
132 }
133 }
134
135 Map<String, NotificationEvent> unconfirmedNotificationEvents =
136 _getUnconfirmedNotificationEvents();
137
138 unconfirmedNotificationEvents.remove(notificationEventUuid);
139 }
140 catch (Exception e) {
141 throw new ChannelException(
142 "Unable to confirm delivery for " + notificationEventUuid , e);
143 }
144 finally {
145 _reentrantLock.unlock();
146 }
147 }
148
149 @Override
150 public void deleteUserNotificiationEvent(String notificationEventUuid)
151 throws ChannelException {
152
153 _reentrantLock.lock();
154
155 try {
156 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvent(
157 notificationEventUuid);
158
159 Map<String, NotificationEvent> unconfirmedNotificationEvents =
160 _getUnconfirmedNotificationEvents();
161
162 unconfirmedNotificationEvents.remove(notificationEventUuid);
163 }
164 catch (Exception e) {
165 throw new ChannelException(
166 "Unable to delete event " + notificationEventUuid , e);
167 }
168 finally {
169 _reentrantLock.unlock();
170 }
171 }
172
173 @Override
174 public void deleteUserNotificiationEvents(
175 Collection<String> notificationEventUuids)
176 throws ChannelException {
177
178 _reentrantLock.lock();
179
180 try {
181 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
182 notificationEventUuids);
183
184 for (String notificationEventUuid : notificationEventUuids) {
185 Map<String, NotificationEvent> unconfirmedNotificationEvents =
186 _getUnconfirmedNotificationEvents();
187
188 unconfirmedNotificationEvents.remove(notificationEventUuid);
189 }
190 }
191 catch (Exception e) {
192 throw new ChannelException(
193 "Unable to delete events for user " + getUserId() , e);
194 }
195 finally {
196 _reentrantLock.unlock();
197 }
198 }
199
200 @Override
201 public void flush() {
202 _reentrantLock.lock();
203
204 try {
205 if (_notificationEvents != null) {
206 _notificationEvents.clear();
207 }
208 }
209 finally {
210 _reentrantLock.unlock();
211 }
212 }
213
214 @Override
215 public void flush(long timestamp) {
216 _reentrantLock.lock();
217
218 try {
219 if (_notificationEvents == null) {
220 return;
221 }
222
223 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
224
225 while (itr.hasNext()) {
226 NotificationEvent notificationEvent = itr.next();
227
228 if (notificationEvent.getTimestamp() < timestamp) {
229 itr.remove();
230 }
231 }
232 }
233 finally {
234 _reentrantLock.unlock();
235 }
236 }
237
238 @Override
239 public List<NotificationEvent> getNotificationEvents(boolean flush)
240 throws ChannelException {
241
242 _reentrantLock.lock();
243
244 try {
245 return doGetNotificationEvents(flush);
246 }
247 catch (ChannelException ce) {
248 throw ce;
249 }
250 catch (Exception e) {
251 throw new ChannelException(e);
252 }
253 finally {
254 _reentrantLock.unlock();
255 }
256 }
257
258 @Override
259 public void init() throws ChannelException {
260 _reentrantLock.lock();
261
262 try {
263 doInit();
264 }
265 catch (SystemException se) {
266 throw new ChannelException(
267 "Unable to init channel " + getUserId(), se);
268 }
269 finally {
270 _reentrantLock.unlock();
271 }
272 }
273
274 @Override
275 public void removeTransientNotificationEvents(
276 Collection<NotificationEvent> notificationEvents) {
277
278 _reentrantLock.lock();
279
280 try {
281 if (_notificationEvents != null) {
282 _notificationEvents.removeAll(notificationEvents);
283 }
284 }
285 finally {
286 _reentrantLock.unlock();
287 }
288 }
289
290 @Override
291 public void removeTransientNotificationEventsByUuid(
292 Collection<String> notificationEventUuids) {
293
294 Set<String> notificationEventUuidsSet = new HashSet<String>(
295 notificationEventUuids);
296
297 _reentrantLock.lock();
298
299 try {
300 if (_notificationEvents == null) {
301 return;
302 }
303
304 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
305
306 while (itr.hasNext()) {
307 NotificationEvent notificationEvent = itr.next();
308
309 if (notificationEventUuidsSet.contains(
310 notificationEvent.getUuid())) {
311
312 itr.remove();
313 }
314 }
315 }
316 finally {
317 _reentrantLock.unlock();
318 }
319 }
320
321 @Override
322 public void sendNotificationEvent(NotificationEvent notificationEvent)
323 throws ChannelException {
324
325 _reentrantLock.lock();
326
327 try {
328 long currentTime = System.currentTimeMillis();
329
330 storeNotificationEvent(notificationEvent, currentTime);
331
332 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
333 notificationEvent.isDeliveryRequired()) {
334
335 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
336 getUserId(), notificationEvent);
337 }
338 }
339 catch (Exception e) {
340 throw new ChannelException("Unable to send event", e);
341 }
342 finally {
343 _reentrantLock.unlock();
344 }
345
346 notifyChannelListeners();
347 }
348
349 @Override
350 public void sendNotificationEvents(
351 Collection<NotificationEvent> notificationEvents)
352 throws ChannelException {
353
354 _reentrantLock.lock();
355
356 try {
357 long currentTime = System.currentTimeMillis();
358
359 List<NotificationEvent> persistedNotificationEvents =
360 new ArrayList<NotificationEvent>(notificationEvents.size());
361
362 for (NotificationEvent notificationEvent : notificationEvents) {
363 storeNotificationEvent(notificationEvent, currentTime);
364
365 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
366 notificationEvent.isDeliveryRequired()) {
367
368 persistedNotificationEvents.add(notificationEvent);
369 }
370 }
371
372 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
373 !persistedNotificationEvents.isEmpty()) {
374
375 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
376 getUserId(), persistedNotificationEvents);
377 }
378 }
379 catch (Exception e) {
380 throw new ChannelException("Unable to send event", e);
381 }
382 finally {
383 _reentrantLock.unlock();
384 }
385
386 notifyChannelListeners();
387 }
388
389 @Override
390 protected void doCleanUp() throws Exception {
391 _reentrantLock.lock();
392
393 try {
394 long currentTime = System.currentTimeMillis();
395
396 TreeSet<NotificationEvent> notificationEvents =
397 _getNotificationEvents();
398
399 Iterator<NotificationEvent> itr1 = notificationEvents.iterator();
400
401 while (itr1.hasNext()) {
402 NotificationEvent notificationEvent = itr1.next();
403
404 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
405 itr1.remove();
406 }
407 }
408
409 Map<String, NotificationEvent> unconfirmedNotificationEvents =
410 _getUnconfirmedNotificationEvents();
411
412 List<String> invalidNotificationEventUuids = new ArrayList<String>(
413 unconfirmedNotificationEvents.size());
414
415 Set<Map.Entry<String, NotificationEvent>>
416 unconfirmedNotificationEventsSet =
417 unconfirmedNotificationEvents.entrySet();
418
419 Iterator<Map.Entry<String, NotificationEvent>> itr2 =
420 unconfirmedNotificationEventsSet.iterator();
421
422 while (itr2.hasNext()) {
423 Map.Entry<String, NotificationEvent> entry = itr2.next();
424
425 NotificationEvent notificationEvent = entry.getValue();
426
427 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
428 invalidNotificationEventUuids.add(entry.getKey());
429
430 itr2.remove();
431 }
432 }
433
434 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
435 !invalidNotificationEventUuids.isEmpty()) {
436
437 UserNotificationEventLocalServiceUtil.
438 deleteUserNotificationEvents(invalidNotificationEventUuids);
439 }
440 }
441 catch (Exception e) {
442 throw new ChannelException(
443 "Unable to clean up channel " + getUserId(), e);
444 }
445 finally {
446 _reentrantLock.unlock();
447 }
448 }
449
450 protected List<NotificationEvent> doGetNotificationEvents(boolean flush)
451 throws Exception {
452
453 long currentTime = System.currentTimeMillis();
454
455 TreeSet<NotificationEvent> notificationEventsSet =
456 _getNotificationEvents();
457
458 Map<String, NotificationEvent> unconfirmedNotificationEvents =
459 _getUnconfirmedNotificationEvents();
460
461 List<NotificationEvent> notificationEvents =
462 new ArrayList<NotificationEvent>(
463 notificationEventsSet.size() +
464 unconfirmedNotificationEvents.size());
465
466 for (NotificationEvent notificationEvent : notificationEventsSet) {
467 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
468 break;
469 }
470 else {
471 notificationEvents.add(notificationEvent);
472 }
473 }
474
475 if (flush) {
476 notificationEventsSet.clear();
477 }
478 else if (notificationEventsSet.size() != notificationEvents.size()) {
479 notificationEventsSet.retainAll(notificationEvents);
480 }
481
482 List<String> invalidNotificationEventUuids = new ArrayList<String>(
483 unconfirmedNotificationEvents.size());
484
485 Set<Map.Entry<String, NotificationEvent>>
486 unconfirmedNotificationEventsSet =
487 unconfirmedNotificationEvents.entrySet();
488
489 Iterator<Map.Entry<String, NotificationEvent>> itr =
490 unconfirmedNotificationEventsSet.iterator();
491
492 while (itr.hasNext()) {
493 Map.Entry<String, NotificationEvent> entry = itr.next();
494
495 NotificationEvent notificationEvent = entry.getValue();
496
497 if (isRemoveNotificationEvent(notificationEvent, currentTime) &&
498 !notificationEvent.isArchived()) {
499
500 invalidNotificationEventUuids.add(notificationEvent.getUuid());
501
502 itr.remove();
503 }
504 else {
505 notificationEvents.add(entry.getValue());
506 }
507 }
508
509 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
510 !invalidNotificationEventUuids.isEmpty()) {
511
512 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
513 invalidNotificationEventUuids);
514 }
515
516 return notificationEvents;
517 }
518
519 protected void doInit() throws SystemException {
520 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
521 return;
522 }
523
524 List<UserNotificationEvent> userNotificationEvents =
525 UserNotificationEventLocalServiceUtil.getUserNotificationEvents(
526 getUserId(), false);
527
528 Map<String, NotificationEvent> unconfirmedNotificationEvents =
529 _getUnconfirmedNotificationEvents();
530
531 List<String> invalidNotificationEventUuids = new ArrayList<String>(
532 unconfirmedNotificationEvents.size());
533
534 long currentTime = System.currentTimeMillis();
535
536 for (UserNotificationEvent persistedNotificationEvent :
537 userNotificationEvents) {
538
539 try {
540 JSONObject payloadJSONObject = JSONFactoryUtil.createJSONObject(
541 persistedNotificationEvent.getPayload());
542
543 NotificationEvent notificationEvent =
544 NotificationEventFactoryUtil.createNotificationEvent(
545 persistedNotificationEvent.getTimestamp(),
546 persistedNotificationEvent.getType(),
547 payloadJSONObject);
548
549 notificationEvent.setDeliveryRequired(
550 persistedNotificationEvent.getDeliverBy());
551
552 notificationEvent.setUuid(persistedNotificationEvent.getUuid());
553
554 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
555 invalidNotificationEventUuids.add(
556 notificationEvent.getUuid());
557 }
558 else {
559 unconfirmedNotificationEvents.put(
560 notificationEvent.getUuid(), notificationEvent);
561 }
562 }
563 catch (JSONException jsone) {
564 _log.error(jsone, jsone);
565
566 invalidNotificationEventUuids.add(
567 persistedNotificationEvent.getUuid());
568 }
569 }
570
571 if (!invalidNotificationEventUuids.isEmpty()) {
572 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
573 invalidNotificationEventUuids);
574 }
575 }
576
577 protected boolean isRemoveNotificationEvent(
578 NotificationEvent notificationEvent, long currentTime) {
579
580 if ((notificationEvent.getDeliverBy() != 0) &&
581 (notificationEvent.getDeliverBy() <= currentTime)) {
582
583 return true;
584 }
585 else {
586 return false;
587 }
588 }
589
590 protected void storeNotificationEvent(
591 NotificationEvent notificationEvent, long currentTime) {
592
593 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
594 return;
595 }
596
597 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
598 notificationEvent.isDeliveryRequired()) {
599
600 Map<String, NotificationEvent> unconfirmedNotificationEvents =
601 _getUnconfirmedNotificationEvents();
602
603 unconfirmedNotificationEvents.put(
604 notificationEvent.getUuid(), notificationEvent);
605 }
606 else {
607 TreeSet<NotificationEvent> notificationEvents =
608 _getNotificationEvents();
609
610 notificationEvents.add(notificationEvent);
611
612 if (notificationEvents.size() >
613 PropsValues.NOTIFICATIONS_MAX_EVENTS) {
614
615 NotificationEvent firstNotificationEvent =
616 notificationEvents.first();
617
618 notificationEvents.remove(firstNotificationEvent);
619 }
620 }
621 }
622
623 private TreeSet<NotificationEvent> _getNotificationEvents() {
624 if (_notificationEvents == null) {
625 _notificationEvents = new TreeSet<NotificationEvent>(_comparator);
626 }
627
628 return _notificationEvents;
629 }
630
631 private Map<String, NotificationEvent> _getUnconfirmedNotificationEvents() {
632 if (_unconfirmedNotificationEvents == null) {
633 _unconfirmedNotificationEvents =
634 new HashMap<String, NotificationEvent>();
635 }
636
637 return _unconfirmedNotificationEvents;
638 }
639
640 private static Log _log = LogFactoryUtil.getLog(ChannelImpl.class);
641
642 private static Comparator<NotificationEvent> _comparator =
643 new NotificationEventComparator();
644
645 private TreeSet<NotificationEvent> _notificationEvents;
646 private ReentrantLock _reentrantLock = new ReentrantLock();
647 private Map<String, NotificationEvent> _unconfirmedNotificationEvents;
648
649 }