001
014
015 package com.liferay.portal.scheduler.job;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterRequest;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.messaging.Message;
023 import com.liferay.portal.kernel.messaging.MessageBusUtil;
024 import com.liferay.portal.kernel.scheduler.JobState;
025 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
026 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
027 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
028 import com.liferay.portal.kernel.scheduler.StorageType;
029 import com.liferay.portal.kernel.util.MethodHandler;
030 import com.liferay.portal.kernel.util.MethodKey;
031 import com.liferay.portal.util.PropsValues;
032
033 import java.util.Map;
034
035 import org.quartz.Job;
036 import org.quartz.JobDataMap;
037 import org.quartz.JobDetail;
038 import org.quartz.JobExecutionContext;
039 import org.quartz.JobKey;
040 import org.quartz.Scheduler;
041
042
046 public class MessageSenderJob implements Job {
047
048 @Override
049 public void execute(JobExecutionContext jobExecutionContext) {
050 try {
051 doExecute(jobExecutionContext);
052 }
053 catch (Exception e) {
054 _log.error("Unable to execute job", e);
055 }
056 }
057
058 protected void doExecute(JobExecutionContext jobExecutionContext)
059 throws Exception {
060
061 JobDetail jobDetail = jobExecutionContext.getJobDetail();
062
063 JobDataMap jobDataMap = jobDetail.getJobDataMap();
064
065 String destinationName = jobDataMap.getString(
066 SchedulerEngine.DESTINATION_NAME);
067
068 String messageJSON = (String)jobDataMap.get(SchedulerEngine.MESSAGE);
069
070 Message message = null;
071
072 if (messageJSON == null) {
073 message = new Message();
074 }
075 else {
076 message = (Message)JSONFactoryUtil.deserialize(messageJSON);
077 }
078
079 message.put(SchedulerEngine.DESTINATION_NAME, destinationName);
080
081 Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
082 SchedulerEngine.JOB_STATE);
083
084 JobState jobState = JobStateSerializeUtil.deserialize(jobStateMap);
085
086 JobKey jobKey = jobDetail.getKey();
087
088 if (jobExecutionContext.getNextFireTime() == null) {
089 message.put(SchedulerEngine.DISABLE, true);
090
091 StorageType storageType = StorageType.valueOf(
092 jobDataMap.getString(SchedulerEngine.STORAGE_TYPE));
093
094 if (PropsValues.CLUSTER_LINK_ENABLED &&
095 storageType.equals(StorageType.MEMORY_CLUSTERED)) {
096
097 notifyClusterMember(jobKey, storageType);
098 }
099
100 if (storageType.equals(StorageType.PERSISTED)) {
101 Scheduler scheduler = jobExecutionContext.getScheduler();
102
103 scheduler.deleteJob(jobKey);
104 }
105 }
106
107 message.put(SchedulerEngine.JOB_NAME, jobKey.getName());
108 message.put(SchedulerEngine.JOB_STATE, jobState);
109 message.put(SchedulerEngine.GROUP_NAME, jobKey.getGroup());
110
111 MessageBusUtil.sendMessage(destinationName, message);
112 }
113
114 protected void notifyClusterMember(JobKey jobKey, StorageType storageType)
115 throws Exception {
116
117 MethodHandler methodHandler = new MethodHandler(
118 _deleteJobMethodKey, jobKey.getName(), jobKey.getGroup(),
119 storageType);
120
121 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
122 methodHandler, true);
123
124 ClusterExecutorUtil.execute(clusterRequest);
125 }
126
127 private static Log _log = LogFactoryUtil.getLog(MessageSenderJob.class);
128
129 private static MethodKey _deleteJobMethodKey = new MethodKey(
130 SchedulerEngineHelperUtil.class.getName(), "delete", String.class,
131 String.class, StorageType.class);
132
133 }