001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONException;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.json.JSONObject;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.notifications.BaseChannelImpl;
024 import com.liferay.portal.kernel.notifications.Channel;
025 import com.liferay.portal.kernel.notifications.ChannelException;
026 import com.liferay.portal.kernel.notifications.NotificationEvent;
027 import com.liferay.portal.kernel.notifications.NotificationEventComparator;
028 import com.liferay.portal.kernel.notifications.NotificationEventFactoryUtil;
029 import com.liferay.portal.model.CompanyConstants;
030 import com.liferay.portal.model.UserNotificationEvent;
031 import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
032 import com.liferay.portal.util.PropsValues;
033
034 import java.util.ArrayList;
035 import java.util.Collection;
036 import java.util.Comparator;
037 import java.util.HashSet;
038 import java.util.Iterator;
039 import java.util.LinkedHashMap;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Set;
043 import java.util.TreeSet;
044 import java.util.concurrent.locks.ReentrantLock;
045
046
051 public class ChannelImpl extends BaseChannelImpl {
052
053 public ChannelImpl() {
054 this(CompanyConstants.SYSTEM, 0);
055 }
056
057 public ChannelImpl(long companyId, long usedId) {
058 super(companyId, usedId);
059 }
060
061 @Override
062 public Channel clone(long companyId, long userId) {
063 return new ChannelImpl(companyId, userId);
064 }
065
066 @Override
067 public void confirmDelivery(Collection<String> notificationEventUuids)
068 throws ChannelException {
069
070 confirmDelivery(notificationEventUuids, false);
071 }
072
073 @Override
074 public void confirmDelivery(
075 Collection<String> notificationEventUuids, boolean archive)
076 throws ChannelException {
077
078 _reentrantLock.lock();
079
080 try {
081 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
082 if (archive) {
083 UserNotificationEventLocalServiceUtil.
084 updateUserNotificationEvents(
085 notificationEventUuids, getCompanyId(), archive);
086 }
087 else {
088 UserNotificationEventLocalServiceUtil.
089 deleteUserNotificationEvents(
090 notificationEventUuids, getCompanyId());
091 }
092 }
093
094 for (String notificationEventUuid : notificationEventUuids) {
095 Map<String, NotificationEvent> unconfirmedNotificationEvents =
096 _getUnconfirmedNotificationEvents();
097
098 unconfirmedNotificationEvents.remove(notificationEventUuid);
099 }
100 }
101 catch (Exception e) {
102 throw new ChannelException(
103 "Unable to confirm delivery for user " + getUserId() , e);
104 }
105 finally {
106 _reentrantLock.unlock();
107 }
108 }
109
110 @Override
111 public void confirmDelivery(String notificationEventUuid)
112 throws ChannelException {
113
114 confirmDelivery(notificationEventUuid, false);
115 }
116
117 @Override
118 public void confirmDelivery(String notificationEventUuid, boolean archive)
119 throws ChannelException {
120
121 _reentrantLock.lock();
122
123 try {
124 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
125 if (archive) {
126 UserNotificationEventLocalServiceUtil.
127 updateUserNotificationEvent(
128 notificationEventUuid, getCompanyId(), archive);
129 }
130 else {
131 UserNotificationEventLocalServiceUtil.
132 deleteUserNotificationEvent(
133 notificationEventUuid, getCompanyId());
134 }
135 }
136
137 Map<String, NotificationEvent> unconfirmedNotificationEvents =
138 _getUnconfirmedNotificationEvents();
139
140 unconfirmedNotificationEvents.remove(notificationEventUuid);
141 }
142 catch (Exception e) {
143 throw new ChannelException(
144 "Unable to confirm delivery for " + notificationEventUuid , e);
145 }
146 finally {
147 _reentrantLock.unlock();
148 }
149 }
150
151 @Override
152 public void deleteUserNotificiationEvent(String notificationEventUuid)
153 throws ChannelException {
154
155 _reentrantLock.lock();
156
157 try {
158 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvent(
159 notificationEventUuid, getCompanyId());
160
161 Map<String, NotificationEvent> unconfirmedNotificationEvents =
162 _getUnconfirmedNotificationEvents();
163
164 unconfirmedNotificationEvents.remove(notificationEventUuid);
165 }
166 catch (Exception e) {
167 throw new ChannelException(
168 "Unable to delete event " + notificationEventUuid , e);
169 }
170 finally {
171 _reentrantLock.unlock();
172 }
173 }
174
175 @Override
176 public void deleteUserNotificiationEvents(
177 Collection<String> notificationEventUuids)
178 throws ChannelException {
179
180 _reentrantLock.lock();
181
182 try {
183 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
184 notificationEventUuids, getCompanyId());
185
186 for (String notificationEventUuid : notificationEventUuids) {
187 Map<String, NotificationEvent> unconfirmedNotificationEvents =
188 _getUnconfirmedNotificationEvents();
189
190 unconfirmedNotificationEvents.remove(notificationEventUuid);
191 }
192 }
193 catch (Exception e) {
194 throw new ChannelException(
195 "Unable to delete events for user " + getUserId() , e);
196 }
197 finally {
198 _reentrantLock.unlock();
199 }
200 }
201
202 @Override
203 public void flush() {
204 _reentrantLock.lock();
205
206 try {
207 if (_notificationEvents != null) {
208 _notificationEvents.clear();
209 }
210 }
211 finally {
212 _reentrantLock.unlock();
213 }
214 }
215
216 @Override
217 public void flush(long timestamp) {
218 _reentrantLock.lock();
219
220 try {
221 if (_notificationEvents == null) {
222 return;
223 }
224
225 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
226
227 while (itr.hasNext()) {
228 NotificationEvent notificationEvent = itr.next();
229
230 if (notificationEvent.getTimestamp() < timestamp) {
231 itr.remove();
232 }
233 }
234 }
235 finally {
236 _reentrantLock.unlock();
237 }
238 }
239
240 @Override
241 public List<NotificationEvent> getNotificationEvents(boolean flush)
242 throws ChannelException {
243
244 _reentrantLock.lock();
245
246 try {
247 return doGetNotificationEvents(flush);
248 }
249 catch (ChannelException ce) {
250 throw ce;
251 }
252 catch (Exception e) {
253 throw new ChannelException(e);
254 }
255 finally {
256 _reentrantLock.unlock();
257 }
258 }
259
260 @Override
261 public void init() throws ChannelException {
262 _reentrantLock.lock();
263
264 try {
265 doInit();
266 }
267 catch (SystemException se) {
268 throw new ChannelException(
269 "Unable to init channel " + getUserId(), se);
270 }
271 finally {
272 _reentrantLock.unlock();
273 }
274 }
275
276 @Override
277 public void removeTransientNotificationEvents(
278 Collection<NotificationEvent> notificationEvents) {
279
280 _reentrantLock.lock();
281
282 try {
283 if (_notificationEvents != null) {
284 _notificationEvents.removeAll(notificationEvents);
285 }
286 }
287 finally {
288 _reentrantLock.unlock();
289 }
290 }
291
292 @Override
293 public void removeTransientNotificationEventsByUuid(
294 Collection<String> notificationEventUuids) {
295
296 Set<String> notificationEventUuidsSet = new HashSet<String>(
297 notificationEventUuids);
298
299 _reentrantLock.lock();
300
301 try {
302 if (_notificationEvents == null) {
303 return;
304 }
305
306 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
307
308 while (itr.hasNext()) {
309 NotificationEvent notificationEvent = itr.next();
310
311 if (notificationEventUuidsSet.contains(
312 notificationEvent.getUuid())) {
313
314 itr.remove();
315 }
316 }
317 }
318 finally {
319 _reentrantLock.unlock();
320 }
321 }
322
323 @Override
324 public void sendNotificationEvent(NotificationEvent notificationEvent)
325 throws ChannelException {
326
327 _reentrantLock.lock();
328
329 try {
330 long currentTime = System.currentTimeMillis();
331
332 storeNotificationEvent(notificationEvent, currentTime);
333
334 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
335 notificationEvent.isDeliveryRequired()) {
336
337 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
338 getUserId(), notificationEvent);
339 }
340 }
341 catch (Exception e) {
342 throw new ChannelException("Unable to send event", e);
343 }
344 finally {
345 _reentrantLock.unlock();
346 }
347
348 notifyChannelListeners();
349 }
350
351 @Override
352 public void sendNotificationEvents(
353 Collection<NotificationEvent> notificationEvents)
354 throws ChannelException {
355
356 _reentrantLock.lock();
357
358 try {
359 long currentTime = System.currentTimeMillis();
360
361 List<NotificationEvent> persistedNotificationEvents =
362 new ArrayList<NotificationEvent>(notificationEvents.size());
363
364 for (NotificationEvent notificationEvent : notificationEvents) {
365 storeNotificationEvent(notificationEvent, currentTime);
366
367 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
368 notificationEvent.isDeliveryRequired()) {
369
370 persistedNotificationEvents.add(notificationEvent);
371 }
372 }
373
374 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
375 !persistedNotificationEvents.isEmpty()) {
376
377 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
378 getUserId(), persistedNotificationEvents);
379 }
380 }
381 catch (Exception e) {
382 throw new ChannelException("Unable to send event", e);
383 }
384 finally {
385 _reentrantLock.unlock();
386 }
387
388 notifyChannelListeners();
389 }
390
391 @Override
392 public void storeNotificationEvent(
393 NotificationEvent notificationEvent, long currentTime) {
394
395 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
396 return;
397 }
398
399 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
400 notificationEvent.isDeliveryRequired()) {
401
402 Map<String, NotificationEvent> unconfirmedNotificationEvents =
403 _getUnconfirmedNotificationEvents();
404
405 unconfirmedNotificationEvents.put(
406 notificationEvent.getUuid(), notificationEvent);
407 }
408 else {
409 TreeSet<NotificationEvent> notificationEvents =
410 _getNotificationEvents();
411
412 notificationEvents.add(notificationEvent);
413
414 if (notificationEvents.size() >
415 PropsValues.NOTIFICATIONS_MAX_EVENTS) {
416
417 NotificationEvent firstNotificationEvent =
418 notificationEvents.first();
419
420 notificationEvents.remove(firstNotificationEvent);
421 }
422 }
423 }
424
425 @Override
426 protected void doCleanUp() throws Exception {
427 _reentrantLock.lock();
428
429 try {
430 long currentTime = System.currentTimeMillis();
431
432 TreeSet<NotificationEvent> notificationEvents =
433 _getNotificationEvents();
434
435 Iterator<NotificationEvent> itr1 = notificationEvents.iterator();
436
437 while (itr1.hasNext()) {
438 NotificationEvent notificationEvent = itr1.next();
439
440 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
441 itr1.remove();
442 }
443 }
444
445 Map<String, NotificationEvent> unconfirmedNotificationEvents =
446 _getUnconfirmedNotificationEvents();
447
448 List<String> invalidNotificationEventUuids = new ArrayList<String>(
449 unconfirmedNotificationEvents.size());
450
451 Set<Map.Entry<String, NotificationEvent>>
452 unconfirmedNotificationEventsSet =
453 unconfirmedNotificationEvents.entrySet();
454
455 Iterator<Map.Entry<String, NotificationEvent>> itr2 =
456 unconfirmedNotificationEventsSet.iterator();
457
458 while (itr2.hasNext()) {
459 Map.Entry<String, NotificationEvent> entry = itr2.next();
460
461 NotificationEvent notificationEvent = entry.getValue();
462
463 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
464 invalidNotificationEventUuids.add(entry.getKey());
465
466 itr2.remove();
467 }
468 }
469
470 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
471 !invalidNotificationEventUuids.isEmpty()) {
472
473 UserNotificationEventLocalServiceUtil.
474 deleteUserNotificationEvents(
475 invalidNotificationEventUuids, getCompanyId());
476 }
477 }
478 catch (Exception e) {
479 throw new ChannelException(
480 "Unable to clean up channel " + getUserId(), e);
481 }
482 finally {
483 _reentrantLock.unlock();
484 }
485 }
486
487 protected List<NotificationEvent> doGetNotificationEvents(boolean flush)
488 throws Exception {
489
490 long currentTime = System.currentTimeMillis();
491
492 TreeSet<NotificationEvent> notificationEventsSet =
493 _getNotificationEvents();
494
495 Map<String, NotificationEvent> unconfirmedNotificationEvents =
496 _getUnconfirmedNotificationEvents();
497
498 List<NotificationEvent> notificationEvents =
499 new ArrayList<NotificationEvent>(
500 notificationEventsSet.size() +
501 unconfirmedNotificationEvents.size());
502
503 for (NotificationEvent notificationEvent : notificationEventsSet) {
504 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
505 break;
506 }
507 else {
508 notificationEvents.add(notificationEvent);
509 }
510 }
511
512 if (flush) {
513 notificationEventsSet.clear();
514 }
515 else if (notificationEventsSet.size() != notificationEvents.size()) {
516 notificationEventsSet.retainAll(notificationEvents);
517 }
518
519 List<String> invalidNotificationEventUuids = new ArrayList<String>(
520 unconfirmedNotificationEvents.size());
521
522 Set<Map.Entry<String, NotificationEvent>>
523 unconfirmedNotificationEventsSet =
524 unconfirmedNotificationEvents.entrySet();
525
526 Iterator<Map.Entry<String, NotificationEvent>> itr =
527 unconfirmedNotificationEventsSet.iterator();
528
529 while (itr.hasNext()) {
530 Map.Entry<String, NotificationEvent> entry = itr.next();
531
532 NotificationEvent notificationEvent = entry.getValue();
533
534 if (isRemoveNotificationEvent(notificationEvent, currentTime) &&
535 !notificationEvent.isArchived()) {
536
537 invalidNotificationEventUuids.add(notificationEvent.getUuid());
538
539 itr.remove();
540 }
541 else {
542 notificationEvents.add(entry.getValue());
543 }
544 }
545
546 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
547 !invalidNotificationEventUuids.isEmpty()) {
548
549 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
550 invalidNotificationEventUuids, getCompanyId());
551 }
552
553 return notificationEvents;
554 }
555
556 protected void doInit() throws SystemException {
557 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
558 return;
559 }
560
561 List<UserNotificationEvent> userNotificationEvents =
562 UserNotificationEventLocalServiceUtil.
563 getDeliveredUserNotificationEvents(getUserId(), false);
564
565 Map<String, NotificationEvent> unconfirmedNotificationEvents =
566 _getUnconfirmedNotificationEvents();
567
568 List<String> invalidNotificationEventUuids = new ArrayList<String>(
569 unconfirmedNotificationEvents.size());
570
571 long currentTime = System.currentTimeMillis();
572
573 for (UserNotificationEvent persistedNotificationEvent :
574 userNotificationEvents) {
575
576 try {
577 JSONObject payloadJSONObject = JSONFactoryUtil.createJSONObject(
578 persistedNotificationEvent.getPayload());
579
580 NotificationEvent notificationEvent =
581 NotificationEventFactoryUtil.createNotificationEvent(
582 persistedNotificationEvent.getTimestamp(),
583 persistedNotificationEvent.getType(),
584 payloadJSONObject);
585
586 notificationEvent.setDeliveryRequired(
587 persistedNotificationEvent.getDeliverBy());
588
589 notificationEvent.setUuid(persistedNotificationEvent.getUuid());
590
591 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
592 invalidNotificationEventUuids.add(
593 notificationEvent.getUuid());
594 }
595 else {
596 unconfirmedNotificationEvents.put(
597 notificationEvent.getUuid(), notificationEvent);
598 }
599 }
600 catch (JSONException jsone) {
601 _log.error(jsone, jsone);
602
603 invalidNotificationEventUuids.add(
604 persistedNotificationEvent.getUuid());
605 }
606 }
607
608 if (!invalidNotificationEventUuids.isEmpty()) {
609 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
610 invalidNotificationEventUuids, getCompanyId());
611 }
612 }
613
614 protected boolean isRemoveNotificationEvent(
615 NotificationEvent notificationEvent, long currentTime) {
616
617 if ((notificationEvent.getDeliverBy() != 0) &&
618 (notificationEvent.getDeliverBy() <= currentTime)) {
619
620 return true;
621 }
622 else {
623 return false;
624 }
625 }
626
627 private TreeSet<NotificationEvent> _getNotificationEvents() {
628 if (_notificationEvents == null) {
629 _notificationEvents = new TreeSet<NotificationEvent>(_comparator);
630 }
631
632 return _notificationEvents;
633 }
634
635 private Map<String, NotificationEvent> _getUnconfirmedNotificationEvents() {
636 if (_unconfirmedNotificationEvents == null) {
637 _unconfirmedNotificationEvents =
638 new LinkedHashMap<String, NotificationEvent>();
639 }
640
641 return _unconfirmedNotificationEvents;
642 }
643
644 private static Log _log = LogFactoryUtil.getLog(ChannelImpl.class);
645
646 private static Comparator<NotificationEvent> _comparator =
647 new NotificationEventComparator();
648
649 private TreeSet<NotificationEvent> _notificationEvents;
650 private ReentrantLock _reentrantLock = new ReentrantLock();
651 private Map<String, NotificationEvent> _unconfirmedNotificationEvents;
652
653 }