001
014
015 package com.liferay.portal.kernel.notifications;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019
020 import java.util.List;
021 import java.util.concurrent.CopyOnWriteArrayList;
022 import java.util.concurrent.atomic.AtomicLong;
023
024
027 public abstract class BaseChannelImpl implements Channel {
028
029 @Override
030 public void cleanUp() throws ChannelException {
031 long currentTime = System.currentTimeMillis();
032
033 long nextCleanUpTime = _nextCleanUpTime.get();
034
035 if ((currentTime > nextCleanUpTime) &&
036 _nextCleanUpTime.compareAndSet(
037 nextCleanUpTime, currentTime + _cleanUpInterval)) {
038
039 try {
040 doCleanUp();
041 }
042 catch (ChannelException ce) {
043 throw ce;
044 }
045 catch (Exception e) {
046 throw new ChannelException(e);
047 }
048 }
049 }
050
051 @Override
052 public void close() throws ChannelException {
053 flush();
054 }
055
056 public long getCompanyId() {
057 return _companyId;
058 }
059
060 @Override
061 public List<NotificationEvent> getNotificationEvents()
062 throws ChannelException {
063
064 return getNotificationEvents(true);
065 }
066
067 @Override
068 public long getUserId() {
069 return _userId;
070 }
071
072 public boolean hasNotificationEvents() {
073 try {
074 List<NotificationEvent> notificationEvents = getNotificationEvents(
075 false);
076
077 if (!notificationEvents.isEmpty()) {
078 return true;
079 }
080 }
081 catch (ChannelException ce) {
082 if (_log.isErrorEnabled()) {
083 _log.error("Unable to fetch notifications", ce);
084 }
085 }
086
087 return false;
088 }
089
090 @Override
091 public void registerChannelListener(ChannelListener channelListener) {
092 _channelListeners.add(channelListener);
093
094 if (hasNotificationEvents()) {
095 notifyChannelListeners();
096 }
097 }
098
099 public void setCleanUpInterval(long cleanUpInterval) {
100 _cleanUpInterval = cleanUpInterval;
101 }
102
103 @Override
104 public void unregisterChannelListener(ChannelListener channelListener) {
105 _channelListeners.remove(channelListener);
106
107 channelListener.channelListenerRemoved(_userId);
108 }
109
110 protected BaseChannelImpl(long companyId, long usedId) {
111 _companyId = companyId;
112 _userId = usedId;
113 }
114
115 protected abstract void doCleanUp() throws Exception;
116
117 protected void notifyChannelListeners() {
118 for (ChannelListener channelListener : _channelListeners) {
119 channelListener.notificationEventsAvailable(_userId);
120 }
121 }
122
123 private static Log _log = LogFactoryUtil.getLog(BaseChannelImpl.class);
124
125 private List<ChannelListener> _channelListeners =
126 new CopyOnWriteArrayList<ChannelListener>();
127 private long _cleanUpInterval;
128 private long _companyId;
129 private AtomicLong _nextCleanUpTime = new AtomicLong();
130 private long _userId;
131
132 }