001
014
015 package com.liferay.portal.scheduler.quartz;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.ClassLoaderBeanHandler;
019 import com.liferay.portal.kernel.dao.db.DB;
020 import com.liferay.portal.kernel.dao.db.DBFactoryUtil;
021 import com.liferay.portal.kernel.json.JSONFactoryUtil;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.Destination;
025 import com.liferay.portal.kernel.messaging.InvokerMessageListener;
026 import com.liferay.portal.kernel.messaging.Message;
027 import com.liferay.portal.kernel.messaging.MessageBus;
028 import com.liferay.portal.kernel.messaging.MessageBusUtil;
029 import com.liferay.portal.kernel.messaging.MessageListener;
030 import com.liferay.portal.kernel.portlet.PortletClassLoaderUtil;
031 import com.liferay.portal.kernel.scheduler.IntervalTrigger;
032 import com.liferay.portal.kernel.scheduler.JobState;
033 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
036 import com.liferay.portal.kernel.scheduler.SchedulerException;
037 import com.liferay.portal.kernel.scheduler.StorageType;
038 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
039 import com.liferay.portal.kernel.scheduler.TriggerState;
040 import com.liferay.portal.kernel.scheduler.TriggerType;
041 import com.liferay.portal.kernel.scheduler.messaging.ReceiverKey;
042 import com.liferay.portal.kernel.scheduler.messaging.SchedulerEventMessageListenerWrapper;
043 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
044 import com.liferay.portal.kernel.util.CharPool;
045 import com.liferay.portal.kernel.util.ClassLoaderPool;
046 import com.liferay.portal.kernel.util.ProxyUtil;
047 import com.liferay.portal.kernel.util.Validator;
048 import com.liferay.portal.scheduler.job.MessageSenderJob;
049 import com.liferay.portal.service.QuartzLocalService;
050 import com.liferay.portal.util.ClassLoaderUtil;
051 import com.liferay.portal.util.PropsUtil;
052 import com.liferay.portal.util.PropsValues;
053
054 import java.util.ArrayList;
055 import java.util.Collections;
056 import java.util.Date;
057 import java.util.List;
058 import java.util.Map;
059 import java.util.Properties;
060 import java.util.Set;
061
062 import org.quartz.CronScheduleBuilder;
063 import org.quartz.CronTrigger;
064 import org.quartz.JobBuilder;
065 import org.quartz.JobDataMap;
066 import org.quartz.JobDetail;
067 import org.quartz.JobKey;
068 import org.quartz.ObjectAlreadyExistsException;
069 import org.quartz.Scheduler;
070 import org.quartz.SimpleScheduleBuilder;
071 import org.quartz.SimpleTrigger;
072 import org.quartz.Trigger;
073 import org.quartz.TriggerBuilder;
074 import org.quartz.TriggerKey;
075 import org.quartz.impl.StdSchedulerFactory;
076 import org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore;
077 import org.quartz.impl.matchers.GroupMatcher;
078
079
086 public class QuartzSchedulerEngine implements SchedulerEngine {
087
088 public void afterPropertiesSet() {
089 if (!PropsValues.SCHEDULER_ENABLED) {
090 return;
091 }
092
093 try {
094 quartzLocalService.checkQuartzTables();
095
096 _persistedScheduler = initializeScheduler(
097 "persisted.scheduler.", true);
098
099 _memoryScheduler = initializeScheduler("memory.scheduler.", false);
100 }
101 catch (Exception e) {
102 _log.error("Unable to initialize engine", e);
103 }
104 }
105
106 @Override
107 public void delete(String groupName) throws SchedulerException {
108 if (!PropsValues.SCHEDULER_ENABLED) {
109 return;
110 }
111
112 try {
113 Scheduler scheduler = getScheduler(groupName);
114
115 groupName = fixMaxLength(
116 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
117
118 Set<JobKey> jobKeys = scheduler.getJobKeys(
119 GroupMatcher.jobGroupEquals(groupName));
120
121 for (JobKey jobKey : jobKeys) {
122 unregisterMessageListener(scheduler, jobKey);
123
124 scheduler.deleteJob(jobKey);
125 }
126 }
127 catch (Exception e) {
128 throw new SchedulerException(
129 "Unable to delete jobs in group " + groupName, e);
130 }
131 }
132
133 @Override
134 public void delete(String jobName, String groupName)
135 throws SchedulerException {
136
137 if (!PropsValues.SCHEDULER_ENABLED) {
138 return;
139 }
140
141 try {
142 Scheduler scheduler = getScheduler(groupName);
143
144 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
145 groupName = fixMaxLength(
146 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
147
148 JobKey jobKey = new JobKey(jobName, groupName);
149
150 unregisterMessageListener(scheduler, jobKey);
151
152 scheduler.deleteJob(jobKey);
153 }
154 catch (Exception e) {
155 throw new SchedulerException(
156 "Unable to delete job {jobName=" + jobName + ", groupName=" +
157 groupName + "}",
158 e);
159 }
160 }
161
162 public void destroy() {
163 try {
164 shutdown();
165 }
166 catch (SchedulerException se) {
167 if (_log.isWarnEnabled()) {
168 _log.warn("Unable to shutdown", se);
169 }
170 }
171 }
172
173 @Override
174 public SchedulerResponse getScheduledJob(String jobName, String groupName)
175 throws SchedulerException {
176
177 if (!PropsValues.SCHEDULER_ENABLED) {
178 return null;
179 }
180
181 try {
182 Scheduler scheduler = getScheduler(groupName);
183
184 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
185 groupName = fixMaxLength(
186 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
187
188 JobKey jobKey = new JobKey(jobName, groupName);
189
190 return getScheduledJob(scheduler, jobKey);
191 }
192 catch (Exception e) {
193 throw new SchedulerException(
194 "Unable to get job {jobName=" + jobName + ", groupName=" +
195 groupName + "}",
196 e);
197 }
198 }
199
200 @Override
201 public List<SchedulerResponse> getScheduledJobs()
202 throws SchedulerException {
203
204 if (!PropsValues.SCHEDULER_ENABLED) {
205 return Collections.emptyList();
206 }
207
208 try {
209 List<String> groupNames = _persistedScheduler.getJobGroupNames();
210
211 List<SchedulerResponse> schedulerResponses =
212 new ArrayList<SchedulerResponse>();
213
214 for (String groupName : groupNames) {
215 schedulerResponses.addAll(
216 getScheduledJobs(_persistedScheduler, groupName));
217 }
218
219 groupNames = _memoryScheduler.getJobGroupNames();
220
221 for (String groupName : groupNames) {
222 schedulerResponses.addAll(
223 getScheduledJobs(_memoryScheduler, groupName));
224 }
225
226 return schedulerResponses;
227 }
228 catch (Exception e) {
229 throw new SchedulerException("Unable to get jobs", e);
230 }
231 }
232
233 @Override
234 public List<SchedulerResponse> getScheduledJobs(String groupName)
235 throws SchedulerException {
236
237 if (!PropsValues.SCHEDULER_ENABLED) {
238 return Collections.emptyList();
239 }
240
241 try {
242 Scheduler scheduler = getScheduler(groupName);
243
244 return getScheduledJobs(scheduler, groupName);
245 }
246 catch (Exception e) {
247 throw new SchedulerException(
248 "Unable to get jobs in group " + groupName, e);
249 }
250 }
251
252 @Override
253 public void pause(String groupName) throws SchedulerException {
254 if (!PropsValues.SCHEDULER_ENABLED) {
255 return;
256 }
257
258 try {
259 Scheduler scheduler = getScheduler(groupName);
260
261 groupName = fixMaxLength(
262 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
263
264 Set<JobKey> jobKeys = scheduler.getJobKeys(
265 GroupMatcher.jobGroupEquals(groupName));
266
267 scheduler.pauseJobs(GroupMatcher.jobGroupEquals(groupName));
268
269 for (JobKey jobKey : jobKeys) {
270 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
271 }
272 }
273 catch (Exception e) {
274 throw new SchedulerException(
275 "Unable to pause jobs in group " + groupName, e);
276 }
277 }
278
279 @Override
280 public void pause(String jobName, String groupName)
281 throws SchedulerException {
282
283 if (!PropsValues.SCHEDULER_ENABLED) {
284 return;
285 }
286
287 try {
288 Scheduler scheduler = getScheduler(groupName);
289
290 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
291 groupName = fixMaxLength(
292 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
293
294 JobKey jobKey = new JobKey(jobName, groupName);
295
296 scheduler.pauseJob(jobKey);
297
298 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
299 }
300 catch (Exception e) {
301 throw new SchedulerException(
302 "Unable to pause job {jobName=" + jobName + ", groupName=" +
303 groupName + "}",
304 e);
305 }
306 }
307
308 @Override
309 public void resume(String groupName) throws SchedulerException {
310 if (!PropsValues.SCHEDULER_ENABLED) {
311 return;
312 }
313
314 try {
315 Scheduler scheduler = getScheduler(groupName);
316
317 groupName = fixMaxLength(
318 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
319
320 Set<JobKey> jobKeys = scheduler.getJobKeys(
321 GroupMatcher.jobGroupEquals(groupName));
322
323 scheduler.resumeJobs(GroupMatcher.jobGroupEquals(groupName));
324
325 for (JobKey jobKey : jobKeys) {
326 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
327 }
328 }
329 catch (Exception e) {
330 throw new SchedulerException(
331 "Unable to resume jobs in group " + groupName, e);
332 }
333 }
334
335 @Override
336 public void resume(String jobName, String groupName)
337 throws SchedulerException {
338
339 if (!PropsValues.SCHEDULER_ENABLED) {
340 return;
341 }
342
343 try {
344 Scheduler scheduler = getScheduler(groupName);
345
346 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
347 groupName = fixMaxLength(
348 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
349
350 JobKey jobKey = new JobKey(jobName, groupName);
351
352 scheduler.resumeJob(jobKey);
353
354 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
355 }
356 catch (Exception e) {
357 throw new SchedulerException(
358 "Unable to resume job {jobName=" + jobName + ", groupName=" +
359 groupName + "}",
360 e);
361 }
362 }
363
364 @Override
365 public void schedule(
366 com.liferay.portal.kernel.scheduler.Trigger trigger,
367 String description, String destination, Message message)
368 throws SchedulerException {
369
370 if (!PropsValues.SCHEDULER_ENABLED) {
371 return;
372 }
373
374 try {
375 Scheduler scheduler = getScheduler(trigger.getGroupName());
376
377 StorageType storageType = getStorageType(trigger.getGroupName());
378
379 trigger = TriggerFactoryUtil.buildTrigger(
380 trigger.getTriggerType(), trigger.getJobName(),
381 getOriginalGroupName(trigger.getGroupName()),
382 trigger.getStartDate(), trigger.getEndDate(),
383 trigger.getTriggerContent());
384
385 Trigger quartzTrigger = getQuartzTrigger(trigger);
386
387 if (quartzTrigger == null) {
388 return;
389 }
390
391 description = fixMaxLength(description, DESCRIPTION_MAX_LENGTH);
392
393 if (message == null) {
394 message = new Message();
395 }
396 else {
397 message = message.clone();
398 }
399
400 registerMessageListeners(
401 trigger.getJobName(), trigger.getGroupName(), destination,
402 message);
403
404 schedule(
405 scheduler, storageType, quartzTrigger, description, destination,
406 message);
407 }
408 catch (RuntimeException re) {
409
410
411
412
413 }
414 catch (Exception e) {
415 throw new SchedulerException("Unable to schedule job", e);
416 }
417 }
418
419 @Override
420 public void shutdown() throws SchedulerException {
421 if (!PropsValues.SCHEDULER_ENABLED) {
422 return;
423 }
424
425 try {
426 if (!_persistedScheduler.isShutdown()) {
427 _persistedScheduler.shutdown(false);
428 }
429
430 if (!_memoryScheduler.isShutdown()) {
431 _memoryScheduler.shutdown(false);
432 }
433 }
434 catch (Exception e) {
435 throw new SchedulerException("Unable to shutdown scheduler", e);
436 }
437 }
438
439 @Override
440 public void start() throws SchedulerException {
441 if (!PropsValues.SCHEDULER_ENABLED) {
442 return;
443 }
444
445 try {
446 _persistedScheduler.start();
447
448 initJobState();
449
450 _memoryScheduler.start();
451 }
452 catch (Exception e) {
453 throw new SchedulerException("Unable to start scheduler", e);
454 }
455 }
456
457 @Override
458 public void suppressError(String jobName, String groupName)
459 throws SchedulerException {
460
461 if (!PropsValues.SCHEDULER_ENABLED) {
462 return;
463 }
464
465 try {
466 Scheduler scheduler = getScheduler(groupName);
467
468 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
469 groupName = fixMaxLength(
470 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
471
472 JobKey jobKey = new JobKey(jobName, groupName);
473
474 updateJobState(scheduler, jobKey, null, true);
475 }
476 catch (Exception e) {
477 throw new SchedulerException(
478 "Unable to suppress error for job {jobName=" + jobName +
479 ", groupName=" + groupName + "}",
480 e);
481 }
482 }
483
484 @Override
485 public void unschedule(String groupName) throws SchedulerException {
486 if (!PropsValues.SCHEDULER_ENABLED) {
487 return;
488 }
489
490 try {
491 Scheduler scheduler = getScheduler(groupName);
492
493 groupName = fixMaxLength(
494 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
495
496 Set<JobKey> jobKeys = scheduler.getJobKeys(
497 GroupMatcher.jobGroupEquals(groupName));
498
499 for (JobKey jobKey : jobKeys) {
500 unschedule(scheduler, jobKey);
501 }
502 }
503 catch (Exception e) {
504 throw new SchedulerException(
505 "Unable to unschedule jobs in group " + groupName, e);
506 }
507 }
508
509 @Override
510 public void unschedule(String jobName, String groupName)
511 throws SchedulerException {
512
513 if (!PropsValues.SCHEDULER_ENABLED) {
514 return;
515 }
516
517 try {
518 Scheduler scheduler = getScheduler(groupName);
519
520 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
521 groupName = fixMaxLength(
522 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
523
524 JobKey jobKey = new JobKey(jobName, groupName);
525
526 unschedule(scheduler, jobKey);
527 }
528 catch (Exception e) {
529 throw new SchedulerException(
530 "Unable to unschedule job {jobName=" + jobName +
531 ", groupName=" + groupName + "}",
532 e);
533 }
534 }
535
536 @Override
537 public void update(com.liferay.portal.kernel.scheduler.Trigger trigger)
538 throws SchedulerException {
539
540 if (!PropsValues.SCHEDULER_ENABLED) {
541 return;
542 }
543
544 try {
545 Scheduler scheduler = getScheduler(trigger.getGroupName());
546
547 trigger = TriggerFactoryUtil.buildTrigger(
548 trigger.getTriggerType(), trigger.getJobName(),
549 getOriginalGroupName(trigger.getGroupName()),
550 trigger.getStartDate(), trigger.getEndDate(),
551 trigger.getTriggerContent());
552
553 update(scheduler, trigger);
554 }
555 catch (Exception e) {
556 throw new SchedulerException("Unable to update trigger", e);
557 }
558 }
559
560 protected String fixMaxLength(String argument, int maxLength) {
561 if (argument == null) {
562 return null;
563 }
564
565 if (argument.length() > maxLength) {
566 argument = argument.substring(0, maxLength);
567 }
568
569 return argument;
570 }
571
572 protected JobState getJobState(JobDataMap jobDataMap) {
573 Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
574 JOB_STATE);
575
576 return JobStateSerializeUtil.deserialize(jobStateMap);
577 }
578
579 protected Message getMessage(JobDataMap jobDataMap) {
580 String messageJSON = (String)jobDataMap.get(MESSAGE);
581
582 return (Message)JSONFactoryUtil.deserialize(messageJSON);
583 }
584
585 protected MessageListener getMessageListener(
586 String messageListenerClassName, ClassLoader classLoader)
587 throws SchedulerException {
588
589 MessageListener schedulerEventListener = null;
590
591 try {
592 Class<? extends MessageListener> clazz =
593 (Class<? extends MessageListener>)classLoader.loadClass(
594 messageListenerClassName);
595
596 schedulerEventListener = clazz.newInstance();
597
598 schedulerEventListener =
599 (MessageListener)ProxyUtil.newProxyInstance(
600 classLoader, new Class<?>[] {MessageListener.class},
601 new ClassLoaderBeanHandler(
602 schedulerEventListener, classLoader));
603 }
604 catch (Exception e) {
605 throw new SchedulerException(
606 "Unable to register message listener with name " +
607 messageListenerClassName,
608 e);
609 }
610
611 return schedulerEventListener;
612 }
613
614 protected String getOriginalGroupName(String groupName) {
615 int pos = groupName.indexOf(CharPool.POUND);
616
617 return groupName.substring(pos + 1);
618 }
619
620 protected Trigger getQuartzTrigger(
621 com.liferay.portal.kernel.scheduler.Trigger trigger)
622 throws SchedulerException {
623
624 if (trigger == null) {
625 return null;
626 }
627
628 Date endDate = trigger.getEndDate();
629 String jobName = fixMaxLength(
630 trigger.getJobName(), JOB_NAME_MAX_LENGTH);
631 String groupName = fixMaxLength(
632 trigger.getGroupName(), GROUP_NAME_MAX_LENGTH);
633
634 Date startDate = trigger.getStartDate();
635
636 if (startDate == null) {
637 startDate = new Date(System.currentTimeMillis());
638 }
639
640 Trigger quartzTrigger = null;
641
642 TriggerType triggerType = trigger.getTriggerType();
643
644 if (triggerType.equals(TriggerType.CRON)) {
645 TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
646
647 triggerBuilder.endAt(endDate);
648 triggerBuilder.forJob(jobName, groupName);
649 triggerBuilder.startAt(startDate);
650 triggerBuilder.withIdentity(jobName, groupName);
651
652 CronScheduleBuilder cronScheduleBuilder =
653 CronScheduleBuilder.cronSchedule(
654 (String)trigger.getTriggerContent());
655
656 triggerBuilder.withSchedule(cronScheduleBuilder);
657
658 quartzTrigger = triggerBuilder.build();
659 }
660 else if (triggerType.equals(TriggerType.SIMPLE)) {
661 long interval = (Long)trigger.getTriggerContent();
662
663 if (interval <= 0) {
664 if (_log.isDebugEnabled()) {
665 _log.debug(
666 "Not scheduling " + trigger.getJobName() +
667 " because interval is less than or equal to 0");
668 }
669
670 return null;
671 }
672
673 TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
674
675 triggerBuilder.endAt(endDate);
676 triggerBuilder.forJob(jobName, groupName);
677 triggerBuilder.startAt(startDate);
678 triggerBuilder.withIdentity(jobName, groupName);
679
680 SimpleScheduleBuilder simpleScheduleBuilder =
681 SimpleScheduleBuilder.simpleSchedule();
682
683 simpleScheduleBuilder.withIntervalInMilliseconds(interval);
684 simpleScheduleBuilder.withRepeatCount(
685 SimpleTrigger.REPEAT_INDEFINITELY);
686
687 triggerBuilder.withSchedule(simpleScheduleBuilder);
688
689 quartzTrigger = triggerBuilder.build();
690 }
691 else {
692 throw new SchedulerException(
693 "Unknown trigger type " + trigger.getTriggerType());
694 }
695
696 return quartzTrigger;
697 }
698
699 protected SchedulerResponse getScheduledJob(
700 Scheduler scheduler, JobKey jobKey)
701 throws Exception {
702
703 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
704
705 if (jobDetail == null) {
706 return null;
707 }
708
709 JobDataMap jobDataMap = jobDetail.getJobDataMap();
710
711 String description = jobDataMap.getString(DESCRIPTION);
712 String destinationName = jobDataMap.getString(DESTINATION_NAME);
713 Message message = getMessage(jobDataMap);
714 StorageType storageType = StorageType.valueOf(
715 jobDataMap.getString(STORAGE_TYPE));
716
717 SchedulerResponse schedulerResponse = null;
718
719 String jobName = jobKey.getName();
720 String groupName = jobKey.getGroup();
721
722 TriggerKey triggerKey = new TriggerKey(jobName, groupName);
723
724 Trigger trigger = scheduler.getTrigger(triggerKey);
725
726 JobState jobState = getJobState(jobDataMap);
727
728 message.put(JOB_STATE, jobState);
729
730 if (trigger == null) {
731 schedulerResponse = new SchedulerResponse();
732
733 schedulerResponse.setDescription(description);
734 schedulerResponse.setDestinationName(destinationName);
735 schedulerResponse.setGroupName(groupName);
736 schedulerResponse.setJobName(jobName);
737 schedulerResponse.setMessage(message);
738 schedulerResponse.setStorageType(storageType);
739 }
740 else {
741 message.put(END_TIME, trigger.getEndTime());
742 message.put(FINAL_FIRE_TIME, trigger.getFinalFireTime());
743 message.put(NEXT_FIRE_TIME, trigger.getNextFireTime());
744 message.put(PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
745 message.put(START_TIME, trigger.getStartTime());
746
747 if (CronTrigger.class.isAssignableFrom(trigger.getClass())) {
748 CronTrigger cronTrigger = CronTrigger.class.cast(trigger);
749
750 schedulerResponse = new SchedulerResponse();
751
752 schedulerResponse.setDescription(description);
753 schedulerResponse.setDestinationName(destinationName);
754 schedulerResponse.setMessage(message);
755 schedulerResponse.setStorageType(storageType);
756 schedulerResponse.setTrigger(
757 new com.liferay.portal.kernel.scheduler.CronTrigger(
758 jobName, groupName, cronTrigger.getStartTime(),
759 cronTrigger.getEndTime(),
760 cronTrigger.getCronExpression()));
761 }
762 else if (SimpleTrigger.class.isAssignableFrom(trigger.getClass())) {
763 SimpleTrigger simpleTrigger = SimpleTrigger.class.cast(trigger);
764
765 schedulerResponse = new SchedulerResponse();
766
767 schedulerResponse.setDescription(description);
768 schedulerResponse.setDestinationName(destinationName);
769 schedulerResponse.setMessage(message);
770 schedulerResponse.setStorageType(storageType);
771 schedulerResponse.setTrigger(
772 new IntervalTrigger(
773 jobName, groupName, simpleTrigger.getStartTime(),
774 simpleTrigger.getEndTime(),
775 simpleTrigger.getRepeatInterval()));
776 }
777 }
778
779 return schedulerResponse;
780 }
781
782 protected List<SchedulerResponse> getScheduledJobs(
783 Scheduler scheduler, String groupName)
784 throws Exception {
785
786 groupName = fixMaxLength(
787 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
788
789 List<SchedulerResponse> schedulerResponses =
790 new ArrayList<SchedulerResponse>();
791
792 Set<JobKey> jobKeys = scheduler.getJobKeys(
793 GroupMatcher.jobGroupEquals(groupName));
794
795 for (JobKey jobKey : jobKeys) {
796 SchedulerResponse schedulerResponse = getScheduledJob(
797 scheduler, jobKey);
798
799 if (schedulerResponse != null) {
800 schedulerResponses.add(schedulerResponse);
801 }
802 }
803
804 return schedulerResponses;
805 }
806
807 protected Scheduler getScheduler(String groupName) throws Exception {
808 if (groupName.startsWith(StorageType.PERSISTED.toString())) {
809 return _persistedScheduler;
810 }
811 else {
812 return _memoryScheduler;
813 }
814 }
815
816 protected StorageType getStorageType(String groupName) {
817 int pos = groupName.indexOf(CharPool.POUND);
818
819 String storageTypeString = groupName.substring(0, pos);
820
821 return StorageType.valueOf(storageTypeString);
822 }
823
824 protected Scheduler initializeScheduler(
825 String propertiesPrefix, boolean useQuartzCluster)
826 throws Exception {
827
828 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
829
830 Properties properties = PropsUtil.getProperties(propertiesPrefix, true);
831
832 if (useQuartzCluster) {
833 DB db = DBFactoryUtil.getDB();
834
835 String dbType = db.getType();
836
837 if (dbType.equals(DB.TYPE_SQLSERVER)) {
838 String lockHandlerClassName = properties.getProperty(
839 "org.quartz.jobStore.lockHandler.class");
840
841 if (Validator.isNull(lockHandlerClassName)) {
842 properties.setProperty(
843 "org.quartz.jobStore.lockHandler.class",
844 UpdateLockRowSemaphore.class.getName());
845 }
846 }
847
848 if (PropsValues.CLUSTER_LINK_ENABLED) {
849 if (dbType.equals(DB.TYPE_HYPERSONIC)) {
850 _log.error("Unable to cluster scheduler on Hypersonic");
851 }
852 else {
853 properties.put(
854 "org.quartz.jobStore.isClustered",
855 Boolean.TRUE.toString());
856 }
857 }
858 }
859
860 schedulerFactory.initialize(properties);
861
862 return schedulerFactory.getScheduler();
863 }
864
865 protected void initJobState() throws Exception {
866 List<String> groupNames = _persistedScheduler.getJobGroupNames();
867
868 for (String groupName : groupNames) {
869 Set<JobKey> jobkeys = _persistedScheduler.getJobKeys(
870 GroupMatcher.jobGroupEquals(groupName));
871
872 for (JobKey jobKey : jobkeys) {
873 Trigger trigger = _persistedScheduler.getTrigger(
874 new TriggerKey(jobKey.getName(), jobKey.getGroup()));
875
876 if (trigger != null) {
877 continue;
878 }
879
880 JobDetail jobDetail = _persistedScheduler.getJobDetail(jobKey);
881
882 JobDataMap jobDataMap = jobDetail.getJobDataMap();
883
884 Message message = getMessage(jobDataMap);
885
886 message.put(JOB_NAME, jobKey.getName());
887 message.put(GROUP_NAME, jobKey.getGroup());
888
889 SchedulerEngineHelperUtil.auditSchedulerJobs(
890 message, TriggerState.EXPIRED);
891
892 _persistedScheduler.deleteJob(jobKey);
893 }
894 }
895 }
896
897 protected void registerMessageListeners(
898 String jobName, String groupName, String destinationName,
899 Message message)
900 throws SchedulerException {
901
902 String messageListenerClassName = message.getString(
903 MESSAGE_LISTENER_CLASS_NAME);
904
905 if (Validator.isNull(messageListenerClassName)) {
906 return;
907 }
908
909 String portletId = message.getString(PORTLET_ID);
910
911 ClassLoader classLoader = null;
912
913 if (Validator.isNull(portletId)) {
914 classLoader = ClassLoaderUtil.getPortalClassLoader();
915 }
916 else {
917 classLoader = PortletClassLoaderUtil.getClassLoader(portletId);
918
919 if (classLoader == null) {
920
921
922
923
924
925 classLoader = ClassLoaderPool.getClassLoader(portletId);
926 }
927 }
928
929 if (classLoader == null) {
930 throw new SchedulerException(
931 "Unable to find class loader for portlet " + portletId);
932 }
933
934 MessageListener schedulerEventListener = getMessageListener(
935 messageListenerClassName, classLoader);
936
937 SchedulerEventMessageListenerWrapper schedulerEventListenerWrapper =
938 new SchedulerEventMessageListenerWrapper();
939
940 schedulerEventListenerWrapper.setGroupName(groupName);
941 schedulerEventListenerWrapper.setJobName(jobName);
942 schedulerEventListenerWrapper.setMessageListener(
943 schedulerEventListener);
944
945 schedulerEventListenerWrapper.afterPropertiesSet();
946
947 MessageBusUtil.registerMessageListener(
948 destinationName, schedulerEventListenerWrapper);
949
950 message.put(
951 MESSAGE_LISTENER_UUID,
952 schedulerEventListenerWrapper.getMessageListenerUUID());
953
954 message.put(RECEIVER_KEY, new ReceiverKey(jobName, groupName));
955 }
956
957 protected void schedule(
958 Scheduler scheduler, StorageType storageType, Trigger trigger,
959 String description, String destinationName, Message message)
960 throws Exception {
961
962 try {
963 JobBuilder jobBuilder = JobBuilder.newJob(MessageSenderJob.class);
964
965 jobBuilder.withIdentity(trigger.getJobKey());
966
967 jobBuilder.storeDurably();
968
969 JobDetail jobDetail = jobBuilder.build();
970
971 JobDataMap jobDataMap = jobDetail.getJobDataMap();
972
973 jobDataMap.put(DESCRIPTION, description);
974 jobDataMap.put(DESTINATION_NAME, destinationName);
975 jobDataMap.put(MESSAGE, JSONFactoryUtil.serialize(message));
976 jobDataMap.put(STORAGE_TYPE, storageType.toString());
977
978 JobState jobState = new JobState(
979 TriggerState.NORMAL, message.getInteger(EXCEPTIONS_MAX_SIZE));
980
981 jobDataMap.put(
982 JOB_STATE, JobStateSerializeUtil.serialize(jobState));
983
984 unregisterMessageListener(scheduler, trigger.getJobKey());
985
986 synchronized (this) {
987 scheduler.deleteJob(trigger.getJobKey());
988 scheduler.scheduleJob(jobDetail, trigger);
989 }
990 }
991 catch (ObjectAlreadyExistsException oaee) {
992 if (_log.isInfoEnabled()) {
993 _log.info("Message is already scheduled");
994 }
995 }
996 }
997
998 protected void unregisterMessageListener(Scheduler scheduler, JobKey jobKey)
999 throws Exception {
1000
1001 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1002
1003 if (jobDetail == null) {
1004 return;
1005 }
1006
1007 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1008
1009 if (jobDataMap == null) {
1010 return;
1011 }
1012
1013 Message message = getMessage(jobDataMap);
1014
1015 String messageListenerUUID = message.getString(MESSAGE_LISTENER_UUID);
1016
1017 if (messageListenerUUID == null) {
1018 return;
1019 }
1020
1021 String destinationName = jobDataMap.getString(DESTINATION_NAME);
1022
1023 MessageBus messageBus = MessageBusUtil.getMessageBus();
1024
1025 Destination destination = messageBus.getDestination(destinationName);
1026
1027 if (destination == null) {
1028 return;
1029 }
1030
1031 Set<MessageListener> messageListeners =
1032 destination.getMessageListeners();
1033
1034 for (MessageListener messageListener : messageListeners) {
1035 if (!(messageListener instanceof InvokerMessageListener)) {
1036 continue;
1037 }
1038
1039 InvokerMessageListener invokerMessageListener =
1040 (InvokerMessageListener)messageListener;
1041
1042 messageListener = invokerMessageListener.getMessageListener();
1043
1044 if (!(messageListener instanceof
1045 SchedulerEventMessageListenerWrapper)) {
1046
1047 continue;
1048 }
1049
1050 SchedulerEventMessageListenerWrapper schedulerMessageListener =
1051 (SchedulerEventMessageListenerWrapper)messageListener;
1052
1053 if (messageListenerUUID.equals(
1054 schedulerMessageListener.getMessageListenerUUID())) {
1055
1056 messageBus.unregisterMessageListener(
1057 destinationName, schedulerMessageListener);
1058
1059 return;
1060 }
1061 }
1062 }
1063
1064 protected void unschedule(Scheduler scheduler, JobKey jobKey)
1065 throws Exception {
1066
1067 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1068
1069 TriggerKey triggerKey = new TriggerKey(
1070 jobKey.getName(), jobKey.getGroup());
1071
1072 if (jobDetail == null) {
1073 return;
1074 }
1075
1076 unregisterMessageListener(scheduler, jobKey);
1077
1078 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1079
1080 JobState jobState = getJobState(jobDataMap);
1081
1082 Trigger trigger = scheduler.getTrigger(triggerKey);
1083
1084 jobState.setTriggerDate(END_TIME, new Date());
1085 jobState.setTriggerDate(FINAL_FIRE_TIME, trigger.getPreviousFireTime());
1086 jobState.setTriggerDate(NEXT_FIRE_TIME, null);
1087 jobState.setTriggerDate(
1088 PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
1089 jobState.setTriggerDate(START_TIME, trigger.getStartTime());
1090
1091 jobState.setTriggerState(TriggerState.UNSCHEDULED);
1092
1093 jobState.clearExceptions();
1094
1095 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1096
1097 scheduler.unscheduleJob(triggerKey);
1098
1099 scheduler.addJob(jobDetail, true);
1100 }
1101
1102 protected void update(
1103 Scheduler scheduler,
1104 com.liferay.portal.kernel.scheduler.Trigger trigger)
1105 throws Exception {
1106
1107 Trigger quartzTrigger = getQuartzTrigger(trigger);
1108
1109 if (quartzTrigger == null) {
1110 return;
1111 }
1112
1113 TriggerKey triggerKey = quartzTrigger.getKey();
1114
1115 if (scheduler.getTrigger(triggerKey) != null) {
1116 scheduler.rescheduleJob(triggerKey, quartzTrigger);
1117 }
1118 else {
1119 JobKey jobKey = quartzTrigger.getJobKey();
1120
1121 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1122
1123 if (jobDetail == null) {
1124 return;
1125 }
1126
1127 synchronized (this) {
1128 scheduler.deleteJob(jobKey);
1129 scheduler.scheduleJob(jobDetail, quartzTrigger);
1130 }
1131
1132 updateJobState(scheduler, jobKey, TriggerState.NORMAL, true);
1133 }
1134 }
1135
1136 protected void updateJobState(
1137 Scheduler scheduler, JobKey jobKey, TriggerState triggerState,
1138 boolean suppressError)
1139 throws Exception {
1140
1141 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1142
1143 if (jobDetail == null) {
1144 return;
1145 }
1146
1147 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1148
1149 JobState jobState = getJobState(jobDataMap);
1150
1151 if (triggerState != null) {
1152 jobState.setTriggerState(triggerState);
1153 }
1154
1155 if (suppressError) {
1156 jobState.clearExceptions();
1157 }
1158
1159 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1160
1161 scheduler.addJob(jobDetail, true);
1162 }
1163
1164 @BeanReference(name = "com.liferay.portal.service.QuartzLocalService")
1165 protected QuartzLocalService quartzLocalService;
1166
1167 private static Log _log = LogFactoryUtil.getLog(
1168 QuartzSchedulerEngine.class);
1169
1170 private Scheduler _memoryScheduler;
1171 private Scheduler _persistedScheduler;
1172
1173 }