001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.cluster.ClusterableContextThreadLocal;
018 import com.liferay.portal.kernel.bean.BeanReference;
019 import com.liferay.portal.kernel.bean.IdentifiableBean;
020 import com.liferay.portal.kernel.cluster.Address;
021 import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
022 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
023 import com.liferay.portal.kernel.cluster.ClusterEvent;
024 import com.liferay.portal.kernel.cluster.ClusterEventListener;
025 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
026 import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
027 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
028 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
029 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
030 import com.liferay.portal.kernel.cluster.ClusterRequest;
031 import com.liferay.portal.kernel.cluster.Clusterable;
032 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
033 import com.liferay.portal.kernel.log.Log;
034 import com.liferay.portal.kernel.log.LogFactoryUtil;
035 import com.liferay.portal.kernel.messaging.Message;
036 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
037 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
038 import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
039 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
040 import com.liferay.portal.kernel.scheduler.SchedulerException;
041 import com.liferay.portal.kernel.scheduler.StorageType;
042 import com.liferay.portal.kernel.scheduler.Trigger;
043 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
044 import com.liferay.portal.kernel.scheduler.TriggerState;
045 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
046 import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
047 import com.liferay.portal.kernel.util.CharPool;
048 import com.liferay.portal.kernel.util.MethodHandler;
049 import com.liferay.portal.kernel.util.MethodKey;
050 import com.liferay.portal.kernel.util.ObjectValuePair;
051 import com.liferay.portal.kernel.util.StringPool;
052 import com.liferay.portal.kernel.util.Validator;
053 import com.liferay.portal.model.Lock;
054 import com.liferay.portal.service.LockLocalServiceUtil;
055 import com.liferay.portal.util.PropsValues;
056
057 import java.io.Serializable;
058
059 import java.util.Iterator;
060 import java.util.List;
061 import java.util.Map;
062 import java.util.Set;
063 import java.util.concurrent.ConcurrentHashMap;
064 import java.util.concurrent.TimeUnit;
065 import java.util.concurrent.TimeoutException;
066 import java.util.concurrent.locks.ReadWriteLock;
067 import java.util.concurrent.locks.ReentrantReadWriteLock;
068
069
072 public class ClusterSchedulerEngine
073 implements IdentifiableBean, SchedulerEngine,
074 SchedulerEngineClusterManager {
075
076 public static SchedulerEngine createClusterSchedulerEngine(
077 SchedulerEngine schedulerEngine) {
078
079 if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
080 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
081 }
082
083 return schedulerEngine;
084 }
085
086 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
087 _schedulerEngine = schedulerEngine;
088 }
089
090 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
091 @Override
092 public void delete(String groupName) throws SchedulerException {
093 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
094
095 _readLock.lock();
096
097 try {
098 if (memoryClusteredSlaveJob) {
099 removeMemoryClusteredJobs(groupName);
100 }
101 else {
102 _schedulerEngine.delete(groupName);
103 }
104 }
105 finally {
106 _readLock.unlock();
107 }
108
109 setClusterableThreadLocal(groupName);
110 }
111
112 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
113 @Override
114 public void delete(String jobName, String groupName)
115 throws SchedulerException {
116
117 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
118
119 _readLock.lock();
120
121 try {
122 if (memoryClusteredSlaveJob) {
123 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
124 }
125 else {
126 _schedulerEngine.delete(jobName, groupName);
127 }
128 }
129 finally {
130 _readLock.unlock();
131 }
132
133 setClusterableThreadLocal(groupName);
134 }
135
136 @Override
137 public String getBeanIdentifier() {
138 return _beanIdentifier;
139 }
140
141 @Override
142 public SchedulerResponse getScheduledJob(String jobName, String groupName)
143 throws SchedulerException {
144
145 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
146 groupName);
147
148 StorageType storageType = objectValuePair.getValue();
149
150 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
151 String masterAddressString = getMasterAddressString(false);
152
153 if (!_localClusterNodeAddress.equals(masterAddressString)) {
154 return callMaster(
155 masterAddressString, _getScheduledJobMethodKey, jobName,
156 objectValuePair.getKey(), storageType);
157 }
158 }
159
160 _readLock.lock();
161
162 try {
163 return _schedulerEngine.getScheduledJob(jobName, groupName);
164 }
165 finally {
166 _readLock.unlock();
167 }
168 }
169
170 @Override
171 public List<SchedulerResponse> getScheduledJobs()
172 throws SchedulerException {
173
174 String masterAddressString = getMasterAddressString(false);
175
176 if (!_localClusterNodeAddress.equals(masterAddressString)) {
177 return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
178 }
179
180 _readLock.lock();
181
182 try {
183 return _schedulerEngine.getScheduledJobs();
184 }
185 finally {
186 _readLock.unlock();
187 }
188 }
189
190 @Override
191 public List<SchedulerResponse> getScheduledJobs(String groupName)
192 throws SchedulerException {
193
194 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
195 groupName);
196
197 StorageType storageType = objectValuePair.getValue();
198
199 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
200 String masterAddressString = getMasterAddressString(false);
201
202 if (!_localClusterNodeAddress.equals(masterAddressString)) {
203 return callMaster(
204 masterAddressString, _getScheduledJobsMethodKey2,
205 objectValuePair.getKey(), storageType);
206 }
207 }
208
209 _readLock.lock();
210
211 try {
212 return _schedulerEngine.getScheduledJobs(groupName);
213 }
214 finally {
215 _readLock.unlock();
216 }
217 }
218
219 @Override
220 public void initialize() throws SchedulerException {
221 try {
222 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
223
224 _readLock = readWriteLock.readLock();
225 _writeLock = readWriteLock.writeLock();
226
227 _localClusterNodeAddress = AddressSerializerUtil.serialize(
228 ClusterExecutorUtil.getLocalClusterNodeAddress());
229
230 _clusterEventListener = new MemorySchedulerClusterEventListener();
231
232 ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
233
234 String masterAddressString = getMasterAddressString(false);
235
236 if (!_localClusterNodeAddress.equals(masterAddressString)) {
237 List<SchedulerResponse> schedulerResponses = callMaster(
238 masterAddressString, _getScheduledJobsMethodKey3,
239 StorageType.MEMORY_CLUSTERED);
240
241 initMemoryClusteredJobs(schedulerResponses);
242 }
243 }
244 catch (Exception e) {
245 throw new SchedulerException("Unable to initialize scheduler", e);
246 }
247 }
248
249 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
250 @Override
251 public void pause(String groupName) throws SchedulerException {
252 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
253
254 _readLock.lock();
255
256 try {
257 if (memoryClusteredSlaveJob) {
258 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
259 }
260 else {
261 _schedulerEngine.pause(groupName);
262 }
263 }
264 finally {
265 _readLock.unlock();
266 }
267
268 setClusterableThreadLocal(groupName);
269 }
270
271 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
272 @Override
273 public void pause(String jobName, String groupName)
274 throws SchedulerException {
275
276 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
277
278 _readLock.lock();
279
280 try {
281 if (memoryClusteredSlaveJob) {
282 updateMemoryClusteredJob(
283 jobName, groupName, TriggerState.PAUSED);
284 }
285 else {
286 _schedulerEngine.pause(jobName, groupName);
287 }
288 }
289 finally {
290 _readLock.unlock();
291 }
292
293 setClusterableThreadLocal(groupName);
294 }
295
296 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
297 @Override
298 public void resume(String groupName) throws SchedulerException {
299 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
300
301 _readLock.lock();
302
303 try {
304 if (memoryClusteredSlaveJob) {
305 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
306 }
307 else {
308 _schedulerEngine.resume(groupName);
309 }
310 }
311 finally {
312 _readLock.unlock();
313 }
314
315 setClusterableThreadLocal(groupName);
316 }
317
318 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
319 @Override
320 public void resume(String jobName, String groupName)
321 throws SchedulerException {
322
323 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
324
325 _readLock.lock();
326
327 try {
328 if (memoryClusteredSlaveJob) {
329 updateMemoryClusteredJob(
330 jobName, groupName, TriggerState.NORMAL);
331 }
332 else {
333 _schedulerEngine.resume(jobName, groupName);
334 }
335 }
336 finally {
337 _readLock.unlock();
338 }
339
340 setClusterableThreadLocal(groupName);
341 }
342
343 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
344 @Override
345 public void schedule(
346 Trigger trigger, String description, String destinationName,
347 Message message)
348 throws SchedulerException {
349
350 String groupName = trigger.getGroupName();
351 String jobName = trigger.getJobName();
352
353 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
354
355 _readLock.lock();
356
357 try {
358 if (memoryClusteredSlaveJob) {
359 SchedulerResponse schedulerResponse = new SchedulerResponse();
360
361 schedulerResponse.setDescription(description);
362 schedulerResponse.setDestinationName(destinationName);
363 schedulerResponse.setGroupName(groupName);
364 schedulerResponse.setJobName(jobName);
365 schedulerResponse.setMessage(message);
366 schedulerResponse.setTrigger(trigger);
367
368 _memoryClusteredJobs.put(
369 getFullName(jobName, groupName),
370 new ObjectValuePair<SchedulerResponse, TriggerState>(
371 schedulerResponse, TriggerState.NORMAL));
372 }
373 else {
374 _schedulerEngine.schedule(
375 trigger, description, destinationName, message);
376 }
377 }
378 finally {
379 _readLock.unlock();
380 }
381
382 setClusterableThreadLocal(groupName);
383 }
384
385 @Override
386 public void setBeanIdentifier(String beanIdentifier) {
387 _beanIdentifier = beanIdentifier;
388 }
389
390 @Override
391 public void shutdown() throws SchedulerException {
392 _portalReady = false;
393
394 try {
395 ClusterExecutorUtil.removeClusterEventListener(
396 _clusterEventListener);
397
398 LockLocalServiceUtil.unlock(
399 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
400 }
401 catch (Exception e) {
402 throw new SchedulerException("Unable to shutdown scheduler", e);
403 }
404
405 _schedulerEngine.shutdown();
406 }
407
408 @Override
409 public void start() throws SchedulerException {
410 _schedulerEngine.start();
411
412 _portalReady = true;
413 }
414
415 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
416 @Override
417 public void suppressError(String jobName, String groupName)
418 throws SchedulerException {
419
420 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
421
422 if (!memoryClusteredSlaveJob) {
423 _readLock.lock();
424
425 try {
426 _schedulerEngine.suppressError(jobName, groupName);
427 }
428 finally {
429 _readLock.unlock();
430 }
431 }
432
433 setClusterableThreadLocal(groupName);
434 }
435
436 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
437 @Override
438 public void unschedule(String groupName) throws SchedulerException {
439 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
440
441 _readLock.lock();
442
443 try {
444 if (memoryClusteredSlaveJob) {
445 removeMemoryClusteredJobs(groupName);
446 }
447 else {
448 _schedulerEngine.unschedule(groupName);
449 }
450 }
451 finally {
452 _readLock.unlock();
453 }
454
455 setClusterableThreadLocal(groupName);
456 }
457
458 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
459 @Override
460 public void unschedule(String jobName, String groupName)
461 throws SchedulerException {
462
463 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
464
465 _readLock.lock();
466
467 try {
468 if (memoryClusteredSlaveJob) {
469 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
470 }
471 else {
472 _schedulerEngine.unschedule(jobName, groupName);
473 }
474 }
475 finally {
476 _readLock.unlock();
477 }
478
479 setClusterableThreadLocal(groupName);
480 }
481
482 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
483 @Override
484 public void update(Trigger trigger) throws SchedulerException {
485 String jobName = trigger.getJobName();
486 String groupName = trigger.getGroupName();
487
488 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
489
490 _readLock.lock();
491
492 try {
493 if (memoryClusteredSlaveJob) {
494 boolean updated = false;
495
496 for (ObjectValuePair<SchedulerResponse, TriggerState>
497 memoryClusteredJob : _memoryClusteredJobs.values()) {
498
499 SchedulerResponse schedulerResponse =
500 memoryClusteredJob.getKey();
501
502 if (jobName.equals(schedulerResponse.getJobName()) &&
503 groupName.equals(schedulerResponse.getGroupName())) {
504
505 schedulerResponse.setTrigger(trigger);
506
507 updated = true;
508
509 break;
510 }
511 }
512
513 if (!updated) {
514 throw new SchedulerException(
515 "Unable to update trigger for memory clustered job");
516 }
517 }
518 else {
519 _schedulerEngine.update(trigger);
520 }
521 }
522 finally {
523 _readLock.unlock();
524 }
525
526 setClusterableThreadLocal(groupName);
527 }
528
529 @Override
530 public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
531 getMasterAddressString(false);
532
533 return null;
534 }
535
536 protected <T> T callMaster(
537 String masterAddressString, MethodKey methodKey,
538 Object... arguments)
539 throws SchedulerException {
540
541 MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
542
543 Address address = AddressSerializerUtil.deserialize(
544 masterAddressString);
545
546 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
547 methodHandler, address);
548
549 try {
550 FutureClusterResponses futureClusterResponses =
551 ClusterExecutorUtil.execute(clusterRequest);
552
553 ClusterNodeResponses clusterNodeResponses =
554 futureClusterResponses.get(20, TimeUnit.SECONDS);
555
556 ClusterNodeResponse clusterNodeResponse =
557 clusterNodeResponses.getClusterResponse(address);
558
559 return (T)clusterNodeResponse.getResult();
560 }
561 catch (Exception e) {
562 throw new SchedulerException(
563 "Unable to load scheduled jobs from cluster node " +
564 address.getDescription(),
565 e);
566 }
567 }
568
569 protected String getFullName(String jobName, String groupName) {
570 return groupName.concat(StringPool.PERIOD).concat(jobName);
571 }
572
573 protected String getMasterAddressString(boolean asynchronous)
574 throws SchedulerException {
575
576 String owner = null;
577
578 while (true) {
579 try {
580 Lock lock = null;
581
582 if (owner == null) {
583 lock = LockLocalServiceUtil.lock(
584 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
585 _localClusterNodeAddress);
586 }
587 else {
588 lock = LockLocalServiceUtil.lock(
589 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
590 _localClusterNodeAddress);
591 }
592
593 owner = lock.getOwner();
594
595 Address address = AddressSerializerUtil.deserialize(owner);
596
597 if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
598 break;
599 }
600 }
601 catch (Exception e) {
602 if (_log.isWarnEnabled()) {
603 _log.warn(
604 "Could not acquire memory scheduler cluster lock", e);
605 }
606 }
607
608 if (_log.isInfoEnabled()) {
609 _log.info("Could not acquire scheduler cluster lock, retrying");
610
611 if (Validator.isNotNull(owner)) {
612 _log.info("Lock currently held by " + owner);
613 }
614 }
615 }
616
617 boolean master = _localClusterNodeAddress.equals(owner);
618
619 if (master == _master) {
620 return owner;
621 }
622
623 if (master) {
624 slaveToMaster();
625 }
626 else {
627 masterToSlave(owner, asynchronous);
628 }
629
630 return owner;
631 }
632
633 protected void initMemoryClusteredJobs(
634 List<SchedulerResponse> schedulerResponses)
635 throws Exception {
636
637 for (SchedulerResponse schedulerResponse : schedulerResponses) {
638 Trigger oldTrigger = schedulerResponse.getTrigger();
639
640 String jobName = schedulerResponse.getJobName();
641 String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
642 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
643
644 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
645 oldTrigger.getTriggerType(), jobName, groupName,
646 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
647 oldTrigger.getTriggerContent());
648
649 schedulerResponse.setTrigger(newTrigger);
650
651 TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
652 schedulerResponse);
653
654 Message message = schedulerResponse.getMessage();
655
656 message.remove(JOB_STATE);
657
658 _memoryClusteredJobs.put(
659 getFullName(jobName, groupName),
660 new ObjectValuePair<SchedulerResponse, TriggerState>(
661 schedulerResponse, triggerState));
662 }
663 }
664
665 protected boolean isMemoryClusteredSlaveJob(String groupName)
666 throws SchedulerException {
667
668 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
669 groupName);
670
671 StorageType storageType = objectValuePair.getValue();
672
673 if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
674 return false;
675 }
676
677 String masterAddressString = getMasterAddressString(false);
678
679 if (_localClusterNodeAddress.equals(masterAddressString)) {
680 return false;
681 }
682
683 return true;
684 }
685
686 protected void masterToSlave(
687 String masterAddressString, boolean asynchronous)
688 throws SchedulerException {
689
690 if (asynchronous) {
691 MethodHandler methodHandler = new MethodHandler(
692 _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
693
694 Address address = AddressSerializerUtil.deserialize(
695 masterAddressString);
696
697 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
698 methodHandler, address);
699
700 try {
701 ClusterExecutorUtil.execute(
702 clusterRequest,
703 new MemorySchedulerClusterResponseCallback(address), 20,
704 TimeUnit.SECONDS);
705
706 return;
707 }
708 catch (Exception e) {
709 throw new SchedulerException(
710 "Unable to load scheduled jobs from cluster node " +
711 address.getDescription(),
712 e);
713 }
714 }
715
716 List<SchedulerResponse> schedulerResponses = callMaster(
717 masterAddressString, _getScheduledJobsMethodKey3,
718 StorageType.MEMORY_CLUSTERED);
719
720 _doMasterToSlave(schedulerResponses);
721 }
722
723 protected void removeMemoryClusteredJobs(String groupName) {
724 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
725 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
726
727 Iterator
728 <Map.Entry<String,
729 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
730 memoryClusteredJobs.iterator();
731
732 while (itr.hasNext()) {
733 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
734 entry = itr.next();
735
736 ObjectValuePair<SchedulerResponse, TriggerState>
737 memoryClusteredJob = entry.getValue();
738
739 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
740
741 if (groupName.equals(schedulerResponse.getGroupName())) {
742 itr.remove();
743 }
744 }
745 }
746
747 protected ObjectValuePair<String, StorageType> resolveGroupName(
748 String groupName) {
749
750 int index = groupName.indexOf(CharPool.POUND);
751
752 String storageTypeString = groupName.substring(0, index);
753
754 StorageType storageType = StorageType.valueOf(storageTypeString);
755
756 String orginalGroupName = groupName.substring(index + 1);
757
758 return new ObjectValuePair<String, StorageType>(
759 orginalGroupName, storageType);
760 }
761
762 protected void setClusterableThreadLocal(String groupName) {
763 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
764 groupName);
765
766 ClusterableContextThreadLocal.putThreadLocalContext(
767 STORAGE_TYPE, objectValuePair.getValue());
768 ClusterableContextThreadLocal.putThreadLocalContext(
769 _PORTAL_READY, _portalReady);
770
771 boolean pluginReady = true;
772
773 if (PluginContextLifecycleThreadLocal.isInitializing() ||
774 PluginContextLifecycleThreadLocal.isDestroying()) {
775
776 pluginReady = false;
777 }
778
779 ClusterableContextThreadLocal.putThreadLocalContext(
780 _PLUGIN_READY, pluginReady);
781 }
782
783 protected void slaveToMaster() throws SchedulerException {
784 boolean forceSync = ProxyModeThreadLocal.isForceSync();
785
786 ProxyModeThreadLocal.setForceSync(true);
787
788 _writeLock.lock();
789
790 try {
791 for (ObjectValuePair<SchedulerResponse, TriggerState>
792 memoryClusteredJob : _memoryClusteredJobs.values()) {
793
794 SchedulerResponse schedulerResponse =
795 memoryClusteredJob.getKey();
796
797 _schedulerEngine.schedule(
798 schedulerResponse.getTrigger(),
799 schedulerResponse.getDescription(),
800 schedulerResponse.getDestinationName(),
801 schedulerResponse.getMessage());
802
803 TriggerState triggerState = memoryClusteredJob.getValue();
804
805 if (triggerState.equals(TriggerState.PAUSED)) {
806 _schedulerEngine.pause(
807 schedulerResponse.getJobName(),
808 schedulerResponse.getGroupName());
809 }
810 }
811
812 _memoryClusteredJobs.clear();
813 }
814 finally {
815 ProxyModeThreadLocal.setForceSync(forceSync);
816
817 _master = true;
818
819 _writeLock.unlock();
820 }
821 }
822
823 protected void updateMemoryClusteredJob(
824 String jobName, String groupName, TriggerState triggerState) {
825
826 ObjectValuePair<SchedulerResponse, TriggerState>
827 memoryClusteredJob = _memoryClusteredJobs.get(
828 getFullName(jobName, groupName));
829
830 if (memoryClusteredJob != null) {
831 memoryClusteredJob.setValue(triggerState);
832 }
833 }
834
835 protected void updateMemoryClusteredJobs(
836 String groupName, TriggerState triggerState) {
837
838 for (ObjectValuePair<SchedulerResponse, TriggerState>
839 memoryClusteredJob : _memoryClusteredJobs.values()) {
840
841 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
842
843 if (groupName.equals(schedulerResponse.getGroupName())) {
844 memoryClusteredJob.setValue(triggerState);
845 }
846 }
847 }
848
849 @BeanReference(
850 name = "com.liferay.portal.scheduler.ClusterSchedulerEngineService")
851 protected SchedulerEngine schedulerEngine;
852
853 private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
854 throws SchedulerException {
855
856 _writeLock.lock();
857
858 try {
859 for (SchedulerResponse schedulerResponse :
860 _schedulerEngine.getScheduledJobs()) {
861
862 if (StorageType.MEMORY_CLUSTERED ==
863 schedulerResponse.getStorageType()) {
864
865 String groupName = StorageType.MEMORY_CLUSTERED.toString();
866
867 groupName = groupName.concat(StringPool.POUND).concat(
868 schedulerResponse.getGroupName());
869
870 _schedulerEngine.delete(
871 schedulerResponse.getJobName(), groupName);
872 }
873 }
874
875 initMemoryClusteredJobs(schedulerResponses);
876
877 if (_log.isInfoEnabled()) {
878 _log.info("Switched current node from master to slave");
879 }
880 }
881 catch (Exception e) {
882 throw new SchedulerException(e);
883 }
884 finally {
885 _master = false;
886
887 _writeLock.unlock();
888 }
889 }
890
891 private static final String _LOCK_CLASS_NAME =
892 SchedulerEngine.class.getName();
893
894 private static final String _PLUGIN_READY = "plugin.ready";
895
896 private static final String _PORTAL_READY = "portal.ready";
897
898 private static Log _log = LogFactoryUtil.getLog(
899 ClusterSchedulerEngine.class);
900
901 private static MethodKey _getScheduledJobMethodKey = new MethodKey(
902 SchedulerEngineHelperUtil.class, "getScheduledJob", String.class,
903 String.class, StorageType.class);
904 private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
905 SchedulerEngineHelperUtil.class, "getScheduledJobs");
906 private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
907 SchedulerEngineHelperUtil.class, "getScheduledJobs", String.class,
908 StorageType.class);
909 private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
910 SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
911
912 private String _beanIdentifier;
913 private ClusterEventListener _clusterEventListener;
914 private volatile String _localClusterNodeAddress;
915 private volatile boolean _master;
916 private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
917 _memoryClusteredJobs = new ConcurrentHashMap
918 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
919 private boolean _portalReady;
920 private java.util.concurrent.locks.Lock _readLock;
921 private SchedulerEngine _schedulerEngine;
922 private java.util.concurrent.locks.Lock _writeLock;
923
924 private static class SchedulerClusterInvokeAcceptor
925 implements ClusterInvokeAcceptor {
926
927 @Override
928 public boolean accept(Map<String, Serializable> context) {
929 if (ClusterInvokeThreadLocal.isEnabled()) {
930 return true;
931 }
932
933 StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
934 boolean portalReady = (Boolean)context.get(_PORTAL_READY);
935 boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
936
937 if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
938 !pluginReady) {
939
940 return false;
941 }
942
943 return true;
944 }
945
946 }
947
948 private class MemorySchedulerClusterEventListener
949 implements ClusterEventListener {
950
951 @Override
952 public void processClusterEvent(ClusterEvent clusterEvent) {
953 try {
954 getMasterAddressString(true);
955 }
956 catch (Exception e) {
957 _log.error("Unable to update memory scheduler cluster lock", e);
958 }
959 }
960
961 }
962
963 private class MemorySchedulerClusterResponseCallback
964 extends BaseClusterResponseCallback {
965
966 public MemorySchedulerClusterResponseCallback(Address address) {
967 _address = address;
968 }
969
970 @Override
971 public void callback(ClusterNodeResponses clusterNodeResponses) {
972 try {
973 ClusterNodeResponse clusterNodeResponse =
974 clusterNodeResponses.getClusterResponse(_address);
975
976 List<SchedulerResponse> schedulerResponses =
977 (List<SchedulerResponse>)clusterNodeResponse.getResult();
978
979 _doMasterToSlave(schedulerResponses);
980 }
981 catch (Exception e) {
982 _log.error(
983 "Unable to load memory clustered jobs from cluster node " +
984 _address.getDescription(),
985 e);
986 }
987 }
988
989 @Override
990 public void processTimeoutException(TimeoutException timeoutException) {
991 _log.error(
992 "Unable to load memory clustered jobs from cluster node " +
993 _address.getDescription(),
994 timeoutException);
995 }
996
997 private Address _address;
998
999 }
1000
1001 }