001
014
015 package com.liferay.portal.scheduler.quartz;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.ClassLoaderBeanHandler;
019 import com.liferay.portal.kernel.dao.db.DB;
020 import com.liferay.portal.kernel.dao.db.DBFactoryUtil;
021 import com.liferay.portal.kernel.json.JSONFactoryUtil;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.Destination;
025 import com.liferay.portal.kernel.messaging.InvokerMessageListener;
026 import com.liferay.portal.kernel.messaging.Message;
027 import com.liferay.portal.kernel.messaging.MessageBus;
028 import com.liferay.portal.kernel.messaging.MessageBusUtil;
029 import com.liferay.portal.kernel.messaging.MessageListener;
030 import com.liferay.portal.kernel.portlet.PortletClassLoaderUtil;
031 import com.liferay.portal.kernel.scheduler.IntervalTrigger;
032 import com.liferay.portal.kernel.scheduler.JobState;
033 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
036 import com.liferay.portal.kernel.scheduler.SchedulerException;
037 import com.liferay.portal.kernel.scheduler.StorageType;
038 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
039 import com.liferay.portal.kernel.scheduler.TriggerState;
040 import com.liferay.portal.kernel.scheduler.TriggerType;
041 import com.liferay.portal.kernel.scheduler.messaging.SchedulerEventMessageListenerWrapper;
042 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043 import com.liferay.portal.kernel.util.CharPool;
044 import com.liferay.portal.kernel.util.ClassLoaderPool;
045 import com.liferay.portal.kernel.util.ProxyUtil;
046 import com.liferay.portal.kernel.util.StringPool;
047 import com.liferay.portal.kernel.util.Validator;
048 import com.liferay.portal.scheduler.job.MessageSenderJob;
049 import com.liferay.portal.service.QuartzLocalService;
050 import com.liferay.portal.util.ClassLoaderUtil;
051 import com.liferay.portal.util.PropsUtil;
052 import com.liferay.portal.util.PropsValues;
053
054 import java.util.ArrayList;
055 import java.util.Collections;
056 import java.util.Date;
057 import java.util.List;
058 import java.util.Map;
059 import java.util.Properties;
060 import java.util.Set;
061
062 import org.quartz.CronScheduleBuilder;
063 import org.quartz.CronTrigger;
064 import org.quartz.JobBuilder;
065 import org.quartz.JobDataMap;
066 import org.quartz.JobDetail;
067 import org.quartz.JobKey;
068 import org.quartz.ObjectAlreadyExistsException;
069 import org.quartz.Scheduler;
070 import org.quartz.SimpleScheduleBuilder;
071 import org.quartz.SimpleTrigger;
072 import org.quartz.Trigger;
073 import org.quartz.TriggerBuilder;
074 import org.quartz.TriggerKey;
075 import org.quartz.impl.StdSchedulerFactory;
076 import org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore;
077 import org.quartz.impl.matchers.GroupMatcher;
078
079
086 public class QuartzSchedulerEngine implements SchedulerEngine {
087
088 public void afterPropertiesSet() {
089 if (!PropsValues.SCHEDULER_ENABLED) {
090 return;
091 }
092
093 try {
094 quartzLocalService.checkQuartzTables();
095
096 _persistedScheduler = initializeScheduler(
097 "persisted.scheduler.", true);
098
099 _memoryScheduler = initializeScheduler("memory.scheduler.", false);
100 }
101 catch (Exception e) {
102 _log.error("Unable to initialize engine", e);
103 }
104 }
105
106 @Override
107 public void delete(String groupName) throws SchedulerException {
108 if (!PropsValues.SCHEDULER_ENABLED) {
109 return;
110 }
111
112 try {
113 Scheduler scheduler = getScheduler(groupName);
114
115 groupName = fixMaxLength(
116 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
117
118 Set<JobKey> jobKeys = scheduler.getJobKeys(
119 GroupMatcher.jobGroupEquals(groupName));
120
121 for (JobKey jobKey : jobKeys) {
122 unregisterMessageListener(scheduler, jobKey);
123
124 scheduler.deleteJob(jobKey);
125 }
126 }
127 catch (Exception e) {
128 throw new SchedulerException(
129 "Unable to delete jobs in group " + groupName, e);
130 }
131 }
132
133 @Override
134 public void delete(String jobName, String groupName)
135 throws SchedulerException {
136
137 if (!PropsValues.SCHEDULER_ENABLED) {
138 return;
139 }
140
141 try {
142 Scheduler scheduler = getScheduler(groupName);
143
144 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
145 groupName = fixMaxLength(
146 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
147
148 JobKey jobKey = new JobKey(jobName, groupName);
149
150 unregisterMessageListener(scheduler, jobKey);
151
152 scheduler.deleteJob(jobKey);
153 }
154 catch (Exception e) {
155 throw new SchedulerException(
156 "Unable to delete job {jobName=" + jobName + ", groupName=" +
157 groupName + "}",
158 e);
159 }
160 }
161
162 public void destroy() {
163 try {
164 shutdown();
165 }
166 catch (SchedulerException se) {
167 if (_log.isWarnEnabled()) {
168 _log.warn("Unable to shutdown", se);
169 }
170 }
171 }
172
173 @Override
174 public SchedulerResponse getScheduledJob(String jobName, String groupName)
175 throws SchedulerException {
176
177 if (!PropsValues.SCHEDULER_ENABLED) {
178 return null;
179 }
180
181 try {
182 Scheduler scheduler = getScheduler(groupName);
183
184 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
185 groupName = fixMaxLength(
186 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
187
188 JobKey jobKey = new JobKey(jobName, groupName);
189
190 return getScheduledJob(scheduler, jobKey);
191 }
192 catch (Exception e) {
193 throw new SchedulerException(
194 "Unable to get job {jobName=" + jobName + ", groupName=" +
195 groupName + "}",
196 e);
197 }
198 }
199
200 @Override
201 public List<SchedulerResponse> getScheduledJobs()
202 throws SchedulerException {
203
204 if (!PropsValues.SCHEDULER_ENABLED) {
205 return Collections.emptyList();
206 }
207
208 try {
209 List<String> groupNames = _persistedScheduler.getJobGroupNames();
210
211 List<SchedulerResponse> schedulerResponses =
212 new ArrayList<SchedulerResponse>();
213
214 for (String groupName : groupNames) {
215 schedulerResponses.addAll(
216 getScheduledJobs(_persistedScheduler, groupName));
217 }
218
219 groupNames = _memoryScheduler.getJobGroupNames();
220
221 for (String groupName : groupNames) {
222 schedulerResponses.addAll(
223 getScheduledJobs(_memoryScheduler, groupName));
224 }
225
226 return schedulerResponses;
227 }
228 catch (Exception e) {
229 throw new SchedulerException("Unable to get jobs", e);
230 }
231 }
232
233 @Override
234 public List<SchedulerResponse> getScheduledJobs(String groupName)
235 throws SchedulerException {
236
237 if (!PropsValues.SCHEDULER_ENABLED) {
238 return Collections.emptyList();
239 }
240
241 try {
242 Scheduler scheduler = getScheduler(groupName);
243
244 return getScheduledJobs(scheduler, groupName);
245 }
246 catch (Exception e) {
247 throw new SchedulerException(
248 "Unable to get jobs in group " + groupName, e);
249 }
250 }
251
252 @Override
253 public void pause(String groupName) throws SchedulerException {
254 if (!PropsValues.SCHEDULER_ENABLED) {
255 return;
256 }
257
258 try {
259 Scheduler scheduler = getScheduler(groupName);
260
261 groupName = fixMaxLength(
262 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
263
264 Set<JobKey> jobKeys = scheduler.getJobKeys(
265 GroupMatcher.jobGroupEquals(groupName));
266
267 for (JobKey jobKey : jobKeys) {
268 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
269 }
270
271 scheduler.pauseJobs(GroupMatcher.jobGroupEquals(groupName));
272 }
273 catch (Exception e) {
274 throw new SchedulerException(
275 "Unable to pause jobs in group " + groupName, e);
276 }
277 }
278
279 @Override
280 public void pause(String jobName, String groupName)
281 throws SchedulerException {
282
283 if (!PropsValues.SCHEDULER_ENABLED) {
284 return;
285 }
286
287 try {
288 Scheduler scheduler = getScheduler(groupName);
289
290 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
291 groupName = fixMaxLength(
292 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
293
294 JobKey jobKey = new JobKey(jobName, groupName);
295
296 updateJobState(scheduler, jobKey, TriggerState.PAUSED, false);
297
298 scheduler.pauseJob(jobKey);
299 }
300 catch (Exception e) {
301 throw new SchedulerException(
302 "Unable to pause job {jobName=" + jobName + ", groupName=" +
303 groupName + "}",
304 e);
305 }
306 }
307
308 @Override
309 public void resume(String groupName) throws SchedulerException {
310 if (!PropsValues.SCHEDULER_ENABLED) {
311 return;
312 }
313
314 try {
315 Scheduler scheduler = getScheduler(groupName);
316
317 groupName = fixMaxLength(
318 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
319
320 Set<JobKey> jobKeys = scheduler.getJobKeys(
321 GroupMatcher.jobGroupEquals(groupName));
322
323 for (JobKey jobKey : jobKeys) {
324 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
325 }
326
327 scheduler.resumeJobs(GroupMatcher.jobGroupEquals(groupName));
328 }
329 catch (Exception e) {
330 throw new SchedulerException(
331 "Unable to resume jobs in group " + groupName, e);
332 }
333 }
334
335 @Override
336 public void resume(String jobName, String groupName)
337 throws SchedulerException {
338
339 if (!PropsValues.SCHEDULER_ENABLED) {
340 return;
341 }
342
343 try {
344 Scheduler scheduler = getScheduler(groupName);
345
346 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
347 groupName = fixMaxLength(
348 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
349
350 JobKey jobKey = new JobKey(jobName, groupName);
351
352 updateJobState(scheduler, jobKey, TriggerState.NORMAL, false);
353
354 scheduler.resumeJob(jobKey);
355 }
356 catch (Exception e) {
357 throw new SchedulerException(
358 "Unable to resume job {jobName=" + jobName + ", groupName=" +
359 groupName + "}",
360 e);
361 }
362 }
363
364 @Override
365 public void schedule(
366 com.liferay.portal.kernel.scheduler.Trigger trigger,
367 String description, String destination, Message message)
368 throws SchedulerException {
369
370 if (!PropsValues.SCHEDULER_ENABLED) {
371 return;
372 }
373
374 try {
375 Scheduler scheduler = getScheduler(trigger.getGroupName());
376
377 StorageType storageType = getStorageType(trigger.getGroupName());
378
379 trigger = TriggerFactoryUtil.buildTrigger(
380 trigger.getTriggerType(), trigger.getJobName(),
381 getOriginalGroupName(trigger.getGroupName()),
382 trigger.getStartDate(), trigger.getEndDate(),
383 trigger.getTriggerContent());
384
385 Trigger quartzTrigger = getQuartzTrigger(trigger);
386
387 if (quartzTrigger == null) {
388 return;
389 }
390
391 description = fixMaxLength(description, DESCRIPTION_MAX_LENGTH);
392
393 if (message == null) {
394 message = new Message();
395 }
396 else {
397 message = message.clone();
398 }
399
400 registerMessageListeners(
401 trigger.getJobName(), trigger.getGroupName(), destination,
402 message);
403
404 schedule(
405 scheduler, storageType, quartzTrigger, description, destination,
406 message);
407 }
408 catch (RuntimeException re) {
409
410
411
412
413 }
414 catch (Exception e) {
415 throw new SchedulerException("Unable to schedule job", e);
416 }
417 }
418
419 @Override
420 public void shutdown() throws SchedulerException {
421 if (!PropsValues.SCHEDULER_ENABLED) {
422 return;
423 }
424
425 try {
426 if (!_persistedScheduler.isShutdown()) {
427 _persistedScheduler.shutdown(false);
428 }
429
430 if (!_memoryScheduler.isShutdown()) {
431 _memoryScheduler.shutdown(false);
432 }
433 }
434 catch (Exception e) {
435 throw new SchedulerException("Unable to shutdown scheduler", e);
436 }
437 }
438
439 @Override
440 public void start() throws SchedulerException {
441 if (!PropsValues.SCHEDULER_ENABLED) {
442 return;
443 }
444
445 try {
446 _persistedScheduler.start();
447
448 initJobState();
449
450 _memoryScheduler.start();
451 }
452 catch (Exception e) {
453 throw new SchedulerException("Unable to start scheduler", e);
454 }
455 }
456
457 @Override
458 public void suppressError(String jobName, String groupName)
459 throws SchedulerException {
460
461 if (!PropsValues.SCHEDULER_ENABLED) {
462 return;
463 }
464
465 try {
466 Scheduler scheduler = getScheduler(groupName);
467
468 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
469 groupName = fixMaxLength(
470 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
471
472 JobKey jobKey = new JobKey(jobName, groupName);
473
474 updateJobState(scheduler, jobKey, null, true);
475 }
476 catch (Exception e) {
477 throw new SchedulerException(
478 "Unable to suppress error for job {jobName=" + jobName +
479 ", groupName=" + groupName + "}",
480 e);
481 }
482 }
483
484 @Override
485 public void unschedule(String groupName) throws SchedulerException {
486 if (!PropsValues.SCHEDULER_ENABLED) {
487 return;
488 }
489
490 try {
491 Scheduler scheduler = getScheduler(groupName);
492
493 groupName = fixMaxLength(
494 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
495
496 Set<JobKey> jobKeys = scheduler.getJobKeys(
497 GroupMatcher.jobGroupEquals(groupName));
498
499 for (JobKey jobKey : jobKeys) {
500 unschedule(scheduler, jobKey);
501 }
502 }
503 catch (Exception e) {
504 throw new SchedulerException(
505 "Unable to unschedule jobs in group " + groupName, e);
506 }
507 }
508
509 @Override
510 public void unschedule(String jobName, String groupName)
511 throws SchedulerException {
512
513 if (!PropsValues.SCHEDULER_ENABLED) {
514 return;
515 }
516
517 try {
518 Scheduler scheduler = getScheduler(groupName);
519
520 jobName = fixMaxLength(jobName, JOB_NAME_MAX_LENGTH);
521 groupName = fixMaxLength(
522 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
523
524 JobKey jobKey = new JobKey(jobName, groupName);
525
526 unschedule(scheduler, jobKey);
527 }
528 catch (Exception e) {
529 throw new SchedulerException(
530 "Unable to unschedule job {jobName=" + jobName +
531 ", groupName=" + groupName + "}",
532 e);
533 }
534 }
535
536 @Override
537 public void update(com.liferay.portal.kernel.scheduler.Trigger trigger)
538 throws SchedulerException {
539
540 if (!PropsValues.SCHEDULER_ENABLED) {
541 return;
542 }
543
544 try {
545 Scheduler scheduler = getScheduler(trigger.getGroupName());
546
547 trigger = TriggerFactoryUtil.buildTrigger(
548 trigger.getTriggerType(), trigger.getJobName(),
549 getOriginalGroupName(trigger.getGroupName()),
550 trigger.getStartDate(), trigger.getEndDate(),
551 trigger.getTriggerContent());
552
553 update(scheduler, trigger);
554 }
555 catch (Exception e) {
556 throw new SchedulerException("Unable to update trigger", e);
557 }
558 }
559
560 protected String fixMaxLength(String argument, int maxLength) {
561 if (argument == null) {
562 return null;
563 }
564
565 if (argument.length() > maxLength) {
566 argument = argument.substring(0, maxLength);
567 }
568
569 return argument;
570 }
571
572 protected String getFullName(String jobName, String groupName) {
573 return groupName.concat(StringPool.PERIOD).concat(jobName);
574 }
575
576 protected JobState getJobState(JobDataMap jobDataMap) {
577 Map<String, Object> jobStateMap = (Map<String, Object>)jobDataMap.get(
578 JOB_STATE);
579
580 return JobStateSerializeUtil.deserialize(jobStateMap);
581 }
582
583 protected Message getMessage(JobDataMap jobDataMap) {
584 String messageJSON = (String)jobDataMap.get(MESSAGE);
585
586 return (Message)JSONFactoryUtil.deserialize(messageJSON);
587 }
588
589 protected MessageListener getMessageListener(
590 String messageListenerClassName, ClassLoader classLoader)
591 throws SchedulerException {
592
593 MessageListener schedulerEventListener = null;
594
595 try {
596 Class<? extends MessageListener> clazz =
597 (Class<? extends MessageListener>)classLoader.loadClass(
598 messageListenerClassName);
599
600 schedulerEventListener = clazz.newInstance();
601
602 schedulerEventListener =
603 (MessageListener)ProxyUtil.newProxyInstance(
604 classLoader, new Class<?>[] {MessageListener.class},
605 new ClassLoaderBeanHandler(
606 schedulerEventListener, classLoader));
607 }
608 catch (Exception e) {
609 throw new SchedulerException(
610 "Unable to register message listener with name " +
611 messageListenerClassName,
612 e);
613 }
614
615 return schedulerEventListener;
616 }
617
618 protected String getOriginalGroupName(String groupName) {
619 int pos = groupName.indexOf(CharPool.POUND);
620
621 return groupName.substring(pos + 1);
622 }
623
624 protected Trigger getQuartzTrigger(
625 com.liferay.portal.kernel.scheduler.Trigger trigger)
626 throws SchedulerException {
627
628 if (trigger == null) {
629 return null;
630 }
631
632 Date endDate = trigger.getEndDate();
633 String jobName = fixMaxLength(
634 trigger.getJobName(), JOB_NAME_MAX_LENGTH);
635 String groupName = fixMaxLength(
636 trigger.getGroupName(), GROUP_NAME_MAX_LENGTH);
637
638 Date startDate = trigger.getStartDate();
639
640 if (startDate == null) {
641 startDate = new Date(System.currentTimeMillis());
642 }
643
644 Trigger quartzTrigger = null;
645
646 TriggerType triggerType = trigger.getTriggerType();
647
648 if (triggerType.equals(TriggerType.CRON)) {
649 TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
650
651 triggerBuilder.endAt(endDate);
652 triggerBuilder.forJob(jobName, groupName);
653 triggerBuilder.startAt(startDate);
654 triggerBuilder.withIdentity(jobName, groupName);
655
656 CronScheduleBuilder cronScheduleBuilder =
657 CronScheduleBuilder.cronSchedule(
658 (String)trigger.getTriggerContent());
659
660 triggerBuilder.withSchedule(cronScheduleBuilder);
661
662 quartzTrigger = triggerBuilder.build();
663 }
664 else if (triggerType.equals(TriggerType.SIMPLE)) {
665 long interval = (Long)trigger.getTriggerContent();
666
667 if (interval <= 0) {
668 if (_log.isDebugEnabled()) {
669 _log.debug(
670 "Not scheduling " + trigger.getJobName() +
671 " because interval is less than or equal to 0");
672 }
673
674 return null;
675 }
676
677 TriggerBuilder<Trigger>triggerBuilder = TriggerBuilder.newTrigger();
678
679 triggerBuilder.endAt(endDate);
680 triggerBuilder.forJob(jobName, groupName);
681 triggerBuilder.startAt(startDate);
682 triggerBuilder.withIdentity(jobName, groupName);
683
684 SimpleScheduleBuilder simpleScheduleBuilder =
685 SimpleScheduleBuilder.simpleSchedule();
686
687 simpleScheduleBuilder.withIntervalInMilliseconds(interval);
688 simpleScheduleBuilder.withRepeatCount(
689 SimpleTrigger.REPEAT_INDEFINITELY);
690
691 triggerBuilder.withSchedule(simpleScheduleBuilder);
692
693 quartzTrigger = triggerBuilder.build();
694 }
695 else {
696 throw new SchedulerException(
697 "Unknown trigger type " + trigger.getTriggerType());
698 }
699
700 return quartzTrigger;
701 }
702
703 protected SchedulerResponse getScheduledJob(
704 Scheduler scheduler, JobKey jobKey)
705 throws Exception {
706
707 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
708
709 if (jobDetail == null) {
710 return null;
711 }
712
713 JobDataMap jobDataMap = jobDetail.getJobDataMap();
714
715 String description = jobDataMap.getString(DESCRIPTION);
716 String destinationName = jobDataMap.getString(DESTINATION_NAME);
717 Message message = getMessage(jobDataMap);
718 StorageType storageType = StorageType.valueOf(
719 jobDataMap.getString(STORAGE_TYPE));
720
721 SchedulerResponse schedulerResponse = null;
722
723 String jobName = jobKey.getName();
724 String groupName = jobKey.getGroup();
725
726 TriggerKey triggerKey = new TriggerKey(jobName, groupName);
727
728 Trigger trigger = scheduler.getTrigger(triggerKey);
729
730 JobState jobState = getJobState(jobDataMap);
731
732 message.put(JOB_STATE, jobState);
733
734 if (trigger == null) {
735 schedulerResponse = new SchedulerResponse();
736
737 schedulerResponse.setDescription(description);
738 schedulerResponse.setDestinationName(destinationName);
739 schedulerResponse.setGroupName(groupName);
740 schedulerResponse.setJobName(jobName);
741 schedulerResponse.setMessage(message);
742 schedulerResponse.setStorageType(storageType);
743 }
744 else {
745 message.put(END_TIME, trigger.getEndTime());
746 message.put(FINAL_FIRE_TIME, trigger.getFinalFireTime());
747 message.put(NEXT_FIRE_TIME, trigger.getNextFireTime());
748 message.put(PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
749 message.put(START_TIME, trigger.getStartTime());
750
751 if (CronTrigger.class.isAssignableFrom(trigger.getClass())) {
752 CronTrigger cronTrigger = CronTrigger.class.cast(trigger);
753
754 schedulerResponse = new SchedulerResponse();
755
756 schedulerResponse.setDescription(description);
757 schedulerResponse.setDestinationName(destinationName);
758 schedulerResponse.setMessage(message);
759 schedulerResponse.setStorageType(storageType);
760 schedulerResponse.setTrigger(
761 new com.liferay.portal.kernel.scheduler.CronTrigger(
762 jobName, groupName, cronTrigger.getStartTime(),
763 cronTrigger.getEndTime(),
764 cronTrigger.getCronExpression()));
765 }
766 else if (SimpleTrigger.class.isAssignableFrom(trigger.getClass())) {
767 SimpleTrigger simpleTrigger = SimpleTrigger.class.cast(trigger);
768
769 schedulerResponse = new SchedulerResponse();
770
771 schedulerResponse.setDescription(description);
772 schedulerResponse.setDestinationName(destinationName);
773 schedulerResponse.setMessage(message);
774 schedulerResponse.setStorageType(storageType);
775 schedulerResponse.setTrigger(
776 new IntervalTrigger(
777 jobName, groupName, simpleTrigger.getStartTime(),
778 simpleTrigger.getEndTime(),
779 simpleTrigger.getRepeatInterval()));
780 }
781 }
782
783 return schedulerResponse;
784 }
785
786 protected List<SchedulerResponse> getScheduledJobs(
787 Scheduler scheduler, String groupName)
788 throws Exception {
789
790 groupName = fixMaxLength(
791 getOriginalGroupName(groupName), GROUP_NAME_MAX_LENGTH);
792
793 List<SchedulerResponse> schedulerResponses =
794 new ArrayList<SchedulerResponse>();
795
796 Set<JobKey> jobKeys = scheduler.getJobKeys(
797 GroupMatcher.jobGroupEquals(groupName));
798
799 for (JobKey jobKey : jobKeys) {
800 SchedulerResponse schedulerResponse = getScheduledJob(
801 scheduler, jobKey);
802
803 if (schedulerResponse != null) {
804 schedulerResponses.add(schedulerResponse);
805 }
806 }
807
808 return schedulerResponses;
809 }
810
811 protected Scheduler getScheduler(String groupName) throws Exception {
812 if (groupName.startsWith(StorageType.PERSISTED.toString())) {
813 return _persistedScheduler;
814 }
815 else {
816 return _memoryScheduler;
817 }
818 }
819
820 protected StorageType getStorageType(String groupName) {
821 int pos = groupName.indexOf(CharPool.POUND);
822
823 String storageTypeString = groupName.substring(0, pos);
824
825 return StorageType.valueOf(storageTypeString);
826 }
827
828 protected Scheduler initializeScheduler(
829 String propertiesPrefix, boolean useQuartzCluster)
830 throws Exception {
831
832 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
833
834 Properties properties = PropsUtil.getProperties(propertiesPrefix, true);
835
836 if (useQuartzCluster) {
837 DB db = DBFactoryUtil.getDB();
838
839 String dbType = db.getType();
840
841 if (dbType.equals(DB.TYPE_SQLSERVER)) {
842 properties.setProperty(
843 "org.quartz.jobStore.lockHandler.class",
844 UpdateLockRowSemaphore.class.getName());
845 }
846
847 if (PropsValues.CLUSTER_LINK_ENABLED) {
848 if (dbType.equals(DB.TYPE_HYPERSONIC)) {
849 _log.error("Unable to cluster scheduler on Hypersonic");
850 }
851 else {
852 properties.put(
853 "org.quartz.jobStore.isClustered",
854 Boolean.TRUE.toString());
855 }
856 }
857 }
858
859 schedulerFactory.initialize(properties);
860
861 return schedulerFactory.getScheduler();
862 }
863
864 protected void initJobState() throws Exception {
865 List<String> groupNames = _persistedScheduler.getJobGroupNames();
866
867 for (String groupName : groupNames) {
868 Set<JobKey> jobkeys = _persistedScheduler.getJobKeys(
869 GroupMatcher.jobGroupEquals(groupName));
870
871 for (JobKey jobKey : jobkeys) {
872 Trigger trigger = _persistedScheduler.getTrigger(
873 new TriggerKey(jobKey.getName(), jobKey.getGroup()));
874
875 if (trigger != null) {
876 continue;
877 }
878
879 JobDetail jobDetail = _persistedScheduler.getJobDetail(jobKey);
880
881 JobDataMap jobDataMap = jobDetail.getJobDataMap();
882
883 Message message = getMessage(jobDataMap);
884
885 message.put(JOB_NAME, jobKey.getName());
886 message.put(GROUP_NAME, jobKey.getGroup());
887
888 SchedulerEngineHelperUtil.auditSchedulerJobs(
889 message, TriggerState.EXPIRED);
890
891 _persistedScheduler.deleteJob(jobKey);
892 }
893 }
894 }
895
896 protected void registerMessageListeners(
897 String jobName, String groupName, String destinationName,
898 Message message)
899 throws SchedulerException {
900
901 String messageListenerClassName = message.getString(
902 MESSAGE_LISTENER_CLASS_NAME);
903
904 if (Validator.isNull(messageListenerClassName)) {
905 return;
906 }
907
908 String portletId = message.getString(PORTLET_ID);
909
910 ClassLoader classLoader = null;
911
912 if (Validator.isNull(portletId)) {
913 classLoader = ClassLoaderUtil.getPortalClassLoader();
914 }
915 else {
916 classLoader = PortletClassLoaderUtil.getClassLoader(portletId);
917
918
919
920
921 if (classLoader == null) {
922
923
924
925
926
927 classLoader = ClassLoaderPool.getClassLoader(portletId);
928 }
929 }
930
931 if (classLoader == null) {
932 throw new SchedulerException(
933 "Unable to find class loader for portlet " + portletId);
934 }
935
936 MessageListener schedulerEventListener = getMessageListener(
937 messageListenerClassName, classLoader);
938
939 SchedulerEventMessageListenerWrapper schedulerEventListenerWrapper =
940 new SchedulerEventMessageListenerWrapper();
941
942 schedulerEventListenerWrapper.setGroupName(groupName);
943 schedulerEventListenerWrapper.setJobName(jobName);
944 schedulerEventListenerWrapper.setMessageListener(
945 schedulerEventListener);
946
947 schedulerEventListenerWrapper.afterPropertiesSet();
948
949 MessageBusUtil.registerMessageListener(
950 destinationName, schedulerEventListenerWrapper);
951
952 message.put(
953 MESSAGE_LISTENER_UUID,
954 schedulerEventListenerWrapper.getMessageListenerUUID());
955
956 message.put(RECEIVER_KEY, getFullName(jobName, groupName));
957 }
958
959 protected void schedule(
960 Scheduler scheduler, StorageType storageType, Trigger trigger,
961 String description, String destinationName, Message message)
962 throws Exception {
963
964 try {
965 JobBuilder jobBuilder = JobBuilder.newJob(MessageSenderJob.class);
966
967 jobBuilder.withIdentity(trigger.getJobKey());
968
969 JobDetail jobDetail = jobBuilder.build();
970
971 JobDataMap jobDataMap = jobDetail.getJobDataMap();
972
973 jobDataMap.put(DESCRIPTION, description);
974 jobDataMap.put(DESTINATION_NAME, destinationName);
975 jobDataMap.put(MESSAGE, JSONFactoryUtil.serialize(message));
976 jobDataMap.put(STORAGE_TYPE, storageType.toString());
977
978 JobState jobState = new JobState(
979 TriggerState.NORMAL, message.getInteger(EXCEPTIONS_MAX_SIZE));
980
981 jobDataMap.put(
982 JOB_STATE, JobStateSerializeUtil.serialize(jobState));
983
984 unregisterMessageListener(scheduler, trigger.getJobKey());
985
986 synchronized (this) {
987 scheduler.deleteJob(trigger.getJobKey());
988 scheduler.scheduleJob(jobDetail, trigger);
989 }
990 }
991 catch (ObjectAlreadyExistsException oaee) {
992 if (_log.isInfoEnabled()) {
993 _log.info("Message is already scheduled");
994 }
995 }
996 }
997
998 protected void unregisterMessageListener(Scheduler scheduler, JobKey jobKey)
999 throws Exception {
1000
1001 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1002
1003 if (jobDetail == null) {
1004 return;
1005 }
1006
1007 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1008
1009 if (jobDataMap == null) {
1010 return;
1011 }
1012
1013 Message message = getMessage(jobDataMap);
1014
1015 String messageListenerUUID = message.getString(MESSAGE_LISTENER_UUID);
1016
1017 if (messageListenerUUID == null) {
1018 return;
1019 }
1020
1021 String destinationName = jobDataMap.getString(DESTINATION_NAME);
1022
1023 MessageBus messageBus = MessageBusUtil.getMessageBus();
1024
1025 Destination destination = messageBus.getDestination(destinationName);
1026
1027 Set<MessageListener> messageListeners =
1028 destination.getMessageListeners();
1029
1030 for (MessageListener messageListener : messageListeners) {
1031 if (!(messageListener instanceof InvokerMessageListener)) {
1032 continue;
1033 }
1034
1035 InvokerMessageListener invokerMessageListener =
1036 (InvokerMessageListener)messageListener;
1037
1038 messageListener = invokerMessageListener.getMessageListener();
1039
1040 if (!(messageListener instanceof
1041 SchedulerEventMessageListenerWrapper)) {
1042
1043 continue;
1044 }
1045
1046 SchedulerEventMessageListenerWrapper schedulerMessageListener =
1047 (SchedulerEventMessageListenerWrapper)messageListener;
1048
1049 if (messageListenerUUID.equals(
1050 schedulerMessageListener.getMessageListenerUUID())) {
1051
1052 messageBus.unregisterMessageListener(
1053 destinationName, schedulerMessageListener);
1054
1055 return;
1056 }
1057 }
1058 }
1059
1060 protected void unschedule(Scheduler scheduler, JobKey jobKey)
1061 throws Exception {
1062
1063 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1064
1065 TriggerKey triggerKey = new TriggerKey(
1066 jobKey.getName(), jobKey.getGroup());
1067
1068 if (jobDetail == null) {
1069 return;
1070 }
1071
1072 unregisterMessageListener(scheduler, jobKey);
1073
1074 if (scheduler == _memoryScheduler) {
1075 scheduler.unscheduleJob(triggerKey);
1076
1077 return;
1078 }
1079
1080 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1081
1082 JobState jobState = getJobState(jobDataMap);
1083
1084 Trigger trigger = scheduler.getTrigger(triggerKey);
1085
1086 jobState.setTriggerDate(END_TIME, new Date());
1087 jobState.setTriggerDate(FINAL_FIRE_TIME, trigger.getPreviousFireTime());
1088 jobState.setTriggerDate(NEXT_FIRE_TIME, null);
1089 jobState.setTriggerDate(
1090 PREVIOUS_FIRE_TIME, trigger.getPreviousFireTime());
1091 jobState.setTriggerDate(START_TIME, trigger.getStartTime());
1092
1093 jobState.setTriggerState(TriggerState.UNSCHEDULED);
1094
1095 jobState.clearExceptions();
1096
1097 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1098
1099 scheduler.unscheduleJob(triggerKey);
1100
1101 scheduler.addJob(jobDetail, true);
1102 }
1103
1104 protected void update(
1105 Scheduler scheduler,
1106 com.liferay.portal.kernel.scheduler.Trigger trigger)
1107 throws Exception {
1108
1109 Trigger quartzTrigger = getQuartzTrigger(trigger);
1110
1111 if (quartzTrigger == null) {
1112 return;
1113 }
1114
1115 TriggerKey triggerKey = quartzTrigger.getKey();
1116
1117 if (scheduler.getTrigger(triggerKey) != null) {
1118 scheduler.rescheduleJob(triggerKey, quartzTrigger);
1119 }
1120 else {
1121 JobKey jobKey = quartzTrigger.getJobKey();
1122
1123 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1124
1125 if (jobDetail == null) {
1126 return;
1127 }
1128
1129 updateJobState(scheduler, jobKey, TriggerState.NORMAL, true);
1130
1131 synchronized (this) {
1132 scheduler.deleteJob(jobKey);
1133 scheduler.scheduleJob(jobDetail, quartzTrigger);
1134 }
1135 }
1136 }
1137
1138 protected void updateJobState(
1139 Scheduler scheduler, JobKey jobKey, TriggerState triggerState,
1140 boolean suppressError)
1141 throws Exception {
1142
1143 JobDetail jobDetail = scheduler.getJobDetail(jobKey);
1144
1145 JobDataMap jobDataMap = jobDetail.getJobDataMap();
1146
1147 JobState jobState = getJobState(jobDataMap);
1148
1149 if (triggerState != null) {
1150 jobState.setTriggerState(triggerState);
1151 }
1152
1153 if (suppressError) {
1154 jobState.clearExceptions();
1155 }
1156
1157 jobDataMap.put(JOB_STATE, JobStateSerializeUtil.serialize(jobState));
1158
1159 scheduler.addJob(jobDetail, true);
1160 }
1161
1162 @BeanReference(name = "com.liferay.portal.service.QuartzLocalService")
1163 protected QuartzLocalService quartzLocalService;
1164
1165 private static Log _log = LogFactoryUtil.getLog(
1166 QuartzSchedulerEngine.class);
1167
1168 private Scheduler _memoryScheduler;
1169 private Scheduler _persistedScheduler;
1170
1171 }