001
014
015 package com.liferay.portal.scheduler.job;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterRequest;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.messaging.Message;
023 import com.liferay.portal.kernel.messaging.MessageBusUtil;
024 import com.liferay.portal.kernel.scheduler.JobState;
025 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
026 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
027 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
028 import com.liferay.portal.kernel.scheduler.StorageType;
029 import com.liferay.portal.kernel.util.MethodHandler;
030 import com.liferay.portal.kernel.util.MethodKey;
031 import com.liferay.portal.util.PropsValues;
032
033 import java.util.Map;
034
035 import org.quartz.Job;
036 import org.quartz.JobDataMap;
037 import org.quartz.JobDetail;
038 import org.quartz.JobExecutionContext;
039 import org.quartz.JobKey;
040 import org.quartz.Scheduler;
041
042
046 public class MessageSenderJob implements Job {
047
048 @Override
049 public void execute(JobExecutionContext jobExecutionContext) {
050 try {
051 doExecute(jobExecutionContext);
052 }
053 catch (Exception e) {
054 _log.error("Unable to execute job", e);
055 }
056 }
057
058 protected void doExecute(JobExecutionContext jobExecutionContext)
059 throws Exception {
060
061 JobDetail jobDetail = jobExecutionContext.getJobDetail();
062
063 JobDataMap jobDataMap = jobDetail.getJobDataMap();
064
065 String destinationName = jobDataMap.getString(
066 SchedulerEngine.DESTINATION_NAME);
067
068 String messageJSON = (String)jobDataMap.get(SchedulerEngine.MESSAGE);
069
070 Message message = null;
071
072 if (messageJSON == null) {
073 message = new Message();
074 }
075 else {
076 message = (Message)JSONFactoryUtil.deserialize(messageJSON);
077 }
078
079 message.put(SchedulerEngine.DESTINATION_NAME, destinationName);
080
081 JobKey jobKey = jobDetail.getKey();
082
083 Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
084 SchedulerEngine.JOB_STATE);
085
086 JobState jobState = JobStateSerializeUtil.deserialize(jobStateMap);
087
088 StorageType storageType = StorageType.valueOf(
089 jobDataMap.getString(SchedulerEngine.STORAGE_TYPE));
090
091 if (jobExecutionContext.getNextFireTime() == null) {
092 message.put(SchedulerEngine.DISABLE, true);
093
094 if (PropsValues.CLUSTER_LINK_ENABLED &&
095 storageType.equals(StorageType.MEMORY_CLUSTERED)) {
096
097 notifyClusterMember(jobKey, storageType);
098 }
099
100 if (storageType.equals(StorageType.PERSISTED)) {
101 Scheduler scheduler = jobExecutionContext.getScheduler();
102
103 scheduler.deleteJob(jobKey);
104 }
105 }
106
107 message.put(SchedulerEngine.JOB_NAME, jobKey.getName());
108 message.put(SchedulerEngine.JOB_STATE, jobState);
109 message.put(SchedulerEngine.GROUP_NAME, jobKey.getGroup());
110 message.put(SchedulerEngine.STORAGE_TYPE, storageType);
111
112 MessageBusUtil.sendMessage(destinationName, message);
113 }
114
115 protected void notifyClusterMember(JobKey jobKey, StorageType storageType)
116 throws Exception {
117
118 MethodHandler methodHandler = new MethodHandler(
119 _deleteJobMethodKey, jobKey.getName(), jobKey.getGroup(),
120 storageType);
121
122 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
123 methodHandler, true);
124
125 ClusterExecutorUtil.execute(clusterRequest);
126 }
127
128 private static Log _log = LogFactoryUtil.getLog(MessageSenderJob.class);
129
130 private static MethodKey _deleteJobMethodKey = new MethodKey(
131 SchedulerEngineHelperUtil.class, "delete", String.class, String.class,
132 StorageType.class);
133
134 }