001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler.job;
016    
017    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018    import com.liferay.portal.kernel.cluster.ClusterRequest;
019    import com.liferay.portal.kernel.json.JSONFactoryUtil;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    import com.liferay.portal.kernel.messaging.Message;
023    import com.liferay.portal.kernel.messaging.MessageBusUtil;
024    import com.liferay.portal.kernel.scheduler.JobState;
025    import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
026    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
027    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
028    import com.liferay.portal.kernel.scheduler.StorageType;
029    import com.liferay.portal.kernel.util.MethodHandler;
030    import com.liferay.portal.kernel.util.MethodKey;
031    import com.liferay.portal.util.PropsValues;
032    
033    import java.util.Map;
034    
035    import org.quartz.Job;
036    import org.quartz.JobDataMap;
037    import org.quartz.JobDetail;
038    import org.quartz.JobExecutionContext;
039    import org.quartz.JobKey;
040    import org.quartz.Scheduler;
041    
042    /**
043     * @author Michael C. Han
044     * @author Bruno Farache
045     */
046    public class MessageSenderJob implements Job {
047    
048            @Override
049            public void execute(JobExecutionContext jobExecutionContext) {
050                    try {
051                            doExecute(jobExecutionContext);
052                    }
053                    catch (Exception e) {
054                            _log.error("Unable to execute job", e);
055                    }
056            }
057    
058            protected void doExecute(JobExecutionContext jobExecutionContext)
059                    throws Exception {
060    
061                    JobDetail jobDetail = jobExecutionContext.getJobDetail();
062    
063                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
064    
065                    String destinationName = jobDataMap.getString(
066                            SchedulerEngine.DESTINATION_NAME);
067    
068                    String messageJSON = (String)jobDataMap.get(SchedulerEngine.MESSAGE);
069    
070                    Message message = null;
071    
072                    if (messageJSON == null) {
073                            message = new Message();
074                    }
075                    else {
076                            message = (Message)JSONFactoryUtil.deserialize(messageJSON);
077                    }
078    
079                    message.put(SchedulerEngine.DESTINATION_NAME, destinationName);
080    
081                    Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
082                            SchedulerEngine.JOB_STATE);
083    
084                    JobState jobState = JobStateSerializeUtil.deserialize(jobStateMap);
085    
086                    JobKey jobKey = jobDetail.getKey();
087    
088                    if (jobExecutionContext.getNextFireTime() == null) {
089                            message.put(SchedulerEngine.DISABLE, true);
090    
091                            StorageType storageType = StorageType.valueOf(
092                                    jobDataMap.getString(SchedulerEngine.STORAGE_TYPE));
093    
094                            if (PropsValues.CLUSTER_LINK_ENABLED &&
095                                    storageType.equals(StorageType.MEMORY_CLUSTERED)) {
096    
097                                    notifyClusterMember(jobKey, storageType);
098                            }
099    
100                            if (storageType.equals(StorageType.PERSISTED)) {
101                                    Scheduler scheduler = jobExecutionContext.getScheduler();
102    
103                                    scheduler.deleteJob(jobKey);
104                            }
105                    }
106    
107                    message.put(SchedulerEngine.JOB_NAME, jobKey.getName());
108                    message.put(SchedulerEngine.JOB_STATE, jobState);
109                    message.put(SchedulerEngine.GROUP_NAME, jobKey.getGroup());
110    
111                    MessageBusUtil.sendMessage(destinationName, message);
112            }
113    
114            protected void notifyClusterMember(JobKey jobKey, StorageType storageType)
115                    throws Exception {
116    
117                    MethodHandler methodHandler = new MethodHandler(
118                            _deleteJobMethodKey, jobKey.getName(), jobKey.getGroup(),
119                            storageType);
120    
121                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
122                            methodHandler, true);
123    
124                    ClusterExecutorUtil.execute(clusterRequest);
125            }
126    
127            private static Log _log = LogFactoryUtil.getLog(MessageSenderJob.class);
128    
129            private static MethodKey _deleteJobMethodKey = new MethodKey(
130                    SchedulerEngineHelperUtil.class.getName(), "delete", String.class,
131                    String.class, StorageType.class);
132    
133    }