001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.cluster.ClusterInvokeThreadLocal;
018 import com.liferay.portal.cluster.ClusterableContextThreadLocal;
019 import com.liferay.portal.kernel.bean.BeanReference;
020 import com.liferay.portal.kernel.bean.IdentifiableBean;
021 import com.liferay.portal.kernel.cluster.Address;
022 import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
023 import com.liferay.portal.kernel.cluster.ClusterEvent;
024 import com.liferay.portal.kernel.cluster.ClusterEventListener;
025 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
026 import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
027 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
028 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
029 import com.liferay.portal.kernel.cluster.ClusterRequest;
030 import com.liferay.portal.kernel.cluster.Clusterable;
031 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
032 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
033 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
034 import com.liferay.portal.kernel.log.Log;
035 import com.liferay.portal.kernel.log.LogFactoryUtil;
036 import com.liferay.portal.kernel.messaging.Message;
037 import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
038 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
039 import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
040 import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
041 import com.liferay.portal.kernel.scheduler.SchedulerException;
042 import com.liferay.portal.kernel.scheduler.StorageType;
043 import com.liferay.portal.kernel.scheduler.Trigger;
044 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
045 import com.liferay.portal.kernel.scheduler.TriggerState;
046 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
047 import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
048 import com.liferay.portal.kernel.util.Base64;
049 import com.liferay.portal.kernel.util.CharPool;
050 import com.liferay.portal.kernel.util.MethodHandler;
051 import com.liferay.portal.kernel.util.MethodKey;
052 import com.liferay.portal.kernel.util.ObjectValuePair;
053 import com.liferay.portal.kernel.util.StringPool;
054 import com.liferay.portal.model.Lock;
055 import com.liferay.portal.service.LockLocalServiceUtil;
056 import com.liferay.portal.util.PropsValues;
057
058 import java.io.ObjectInputStream;
059 import java.io.ObjectOutputStream;
060 import java.io.Serializable;
061
062 import java.util.Iterator;
063 import java.util.List;
064 import java.util.Map;
065 import java.util.Set;
066 import java.util.concurrent.ConcurrentHashMap;
067 import java.util.concurrent.TimeUnit;
068 import java.util.concurrent.TimeoutException;
069 import java.util.concurrent.locks.ReadWriteLock;
070 import java.util.concurrent.locks.ReentrantReadWriteLock;
071
072
075 public class ClusterSchedulerEngine
076 implements IdentifiableBean, SchedulerEngine,
077 SchedulerEngineClusterManager {
078
079 public static SchedulerEngine createClusterSchedulerEngine(
080 SchedulerEngine schedulerEngine) {
081
082 if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
083 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
084 }
085
086 return schedulerEngine;
087 }
088
089 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
090 _schedulerEngine = schedulerEngine;
091 }
092
093 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
094 @Override
095 public void delete(String groupName) throws SchedulerException {
096 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
097
098 _readLock.lock();
099
100 try {
101 if (memoryClusteredSlaveJob) {
102 removeMemoryClusteredJobs(groupName);
103 }
104 else {
105 _schedulerEngine.delete(groupName);
106 }
107 }
108 finally {
109 _readLock.unlock();
110 }
111
112 setClusterableThreadLocal(groupName);
113 }
114
115 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
116 @Override
117 public void delete(String jobName, String groupName)
118 throws SchedulerException {
119
120 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
121
122 _readLock.lock();
123
124 try {
125 if (memoryClusteredSlaveJob) {
126 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
127 }
128 else {
129 _schedulerEngine.delete(jobName, groupName);
130 }
131 }
132 finally {
133 _readLock.unlock();
134 }
135
136 setClusterableThreadLocal(groupName);
137 }
138
139 @Override
140 public String getBeanIdentifier() {
141 return _beanIdentifier;
142 }
143
144 @Override
145 public SchedulerResponse getScheduledJob(String jobName, String groupName)
146 throws SchedulerException {
147
148 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
149 groupName);
150
151 StorageType storageType = objectValuePair.getValue();
152
153 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
154 String masterAddressString = getMasterAddressString(false);
155
156 if (!_localClusterNodeAddress.equals(masterAddressString)) {
157 return (SchedulerResponse)callMaster(
158 masterAddressString, _getScheduledJobMethodKey, jobName,
159 objectValuePair.getKey(), storageType);
160 }
161 }
162
163 _readLock.lock();
164
165 try {
166 return _schedulerEngine.getScheduledJob(jobName, groupName);
167 }
168 finally {
169 _readLock.unlock();
170 }
171 }
172
173 @Override
174 public List<SchedulerResponse> getScheduledJobs()
175 throws SchedulerException {
176
177 String masterAddressString = getMasterAddressString(false);
178
179 if (!_localClusterNodeAddress.equals(masterAddressString)) {
180 return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
181 }
182
183 _readLock.lock();
184
185 try {
186 return _schedulerEngine.getScheduledJobs();
187 }
188 finally {
189 _readLock.unlock();
190 }
191 }
192
193 @Override
194 public List<SchedulerResponse> getScheduledJobs(String groupName)
195 throws SchedulerException {
196
197 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
198 groupName);
199
200 StorageType storageType = objectValuePair.getValue();
201
202 if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
203 String masterAddressString = getMasterAddressString(false);
204
205 if (!_localClusterNodeAddress.equals(masterAddressString)) {
206 return callMaster(
207 masterAddressString, _getScheduledJobsMethodKey2,
208 objectValuePair.getKey(), storageType);
209 }
210 }
211
212 _readLock.lock();
213
214 try {
215 return _schedulerEngine.getScheduledJobs(groupName);
216 }
217 finally {
218 _readLock.unlock();
219 }
220 }
221
222 @Override
223 public void initialize() throws SchedulerException {
224 try {
225 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
226
227 _readLock = readWriteLock.readLock();
228 _writeLock = readWriteLock.writeLock();
229
230 _localClusterNodeAddress = getSerializedString(
231 ClusterExecutorUtil.getLocalClusterNodeAddress());
232
233 _clusterEventListener = new MemorySchedulerClusterEventListener();
234
235 ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
236
237 String masterAddressString = getMasterAddressString(false);
238
239 if (!_localClusterNodeAddress.equals(masterAddressString)) {
240 List<SchedulerResponse> schedulerResponses = callMaster(
241 masterAddressString, _getScheduledJobsMethodKey3,
242 StorageType.MEMORY_CLUSTERED);
243
244 initMemoryClusteredJobs(schedulerResponses);
245 }
246 }
247 catch (Exception e) {
248 throw new SchedulerException("Unable to initialize scheduler", e);
249 }
250 }
251
252 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
253 @Override
254 public void pause(String groupName) throws SchedulerException {
255 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
256
257 _readLock.lock();
258
259 try {
260 if (memoryClusteredSlaveJob) {
261 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
262 }
263 else {
264 _schedulerEngine.pause(groupName);
265 }
266 }
267 finally {
268 _readLock.unlock();
269 }
270
271 setClusterableThreadLocal(groupName);
272 }
273
274 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
275 @Override
276 public void pause(String jobName, String groupName)
277 throws SchedulerException {
278
279 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
280
281 _readLock.lock();
282
283 try {
284 if (memoryClusteredSlaveJob) {
285 updateMemoryClusteredJob(
286 jobName, groupName, TriggerState.PAUSED);
287 }
288 else {
289 _schedulerEngine.pause(jobName, groupName);
290 }
291 }
292 finally {
293 _readLock.unlock();
294 }
295
296 setClusterableThreadLocal(groupName);
297 }
298
299 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
300 @Override
301 public void resume(String groupName) throws SchedulerException {
302 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
303
304 _readLock.lock();
305
306 try {
307 if (memoryClusteredSlaveJob) {
308 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
309 }
310 else {
311 _schedulerEngine.resume(groupName);
312 }
313 }
314 finally {
315 _readLock.unlock();
316 }
317
318 setClusterableThreadLocal(groupName);
319 }
320
321 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
322 @Override
323 public void resume(String jobName, String groupName)
324 throws SchedulerException {
325
326 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
327
328 _readLock.lock();
329
330 try {
331 if (memoryClusteredSlaveJob) {
332 updateMemoryClusteredJob(
333 jobName, groupName, TriggerState.NORMAL);
334 }
335 else {
336 _schedulerEngine.resume(jobName, groupName);
337 }
338 }
339 finally {
340 _readLock.unlock();
341 }
342
343 setClusterableThreadLocal(groupName);
344 }
345
346 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
347 @Override
348 public void schedule(
349 Trigger trigger, String description, String destinationName,
350 Message message)
351 throws SchedulerException {
352
353 String groupName = trigger.getGroupName();
354 String jobName = trigger.getJobName();
355
356 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
357
358 _readLock.lock();
359
360 try {
361 if (memoryClusteredSlaveJob) {
362 SchedulerResponse schedulerResponse = new SchedulerResponse();
363
364 schedulerResponse.setDescription(description);
365 schedulerResponse.setDestinationName(destinationName);
366 schedulerResponse.setGroupName(groupName);
367 schedulerResponse.setJobName(jobName);
368 schedulerResponse.setMessage(message);
369 schedulerResponse.setTrigger(trigger);
370
371 _memoryClusteredJobs.put(
372 getFullName(jobName, groupName),
373 new ObjectValuePair<SchedulerResponse, TriggerState>(
374 schedulerResponse, TriggerState.NORMAL));
375 }
376 else {
377 _schedulerEngine.schedule(
378 trigger, description, destinationName, message);
379 }
380 }
381 finally {
382 _readLock.unlock();
383 }
384
385 setClusterableThreadLocal(groupName);
386 }
387
388 @Override
389 public void setBeanIdentifier(String beanIdentifier) {
390 _beanIdentifier = beanIdentifier;
391 }
392
393 @Override
394 public void shutdown() throws SchedulerException {
395 _portalReady = false;
396
397 try {
398 ClusterExecutorUtil.removeClusterEventListener(
399 _clusterEventListener);
400
401 LockLocalServiceUtil.unlock(
402 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
403 PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
404 }
405 catch (Exception e) {
406 throw new SchedulerException("Unable to shutdown scheduler", e);
407 }
408
409 _schedulerEngine.shutdown();
410 }
411
412 @Override
413 public void start() throws SchedulerException {
414 _schedulerEngine.start();
415
416 _portalReady = true;
417 }
418
419 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
420 @Override
421 public void suppressError(String jobName, String groupName)
422 throws SchedulerException {
423
424 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
425
426 if (!memoryClusteredSlaveJob) {
427 _readLock.lock();
428
429 try {
430 _schedulerEngine.suppressError(jobName, groupName);
431 }
432 finally {
433 _readLock.unlock();
434 }
435 }
436
437 setClusterableThreadLocal(groupName);
438 }
439
440 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
441 @Override
442 public void unschedule(String groupName) throws SchedulerException {
443 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
444
445 _readLock.lock();
446
447 try {
448 if (memoryClusteredSlaveJob) {
449 removeMemoryClusteredJobs(groupName);
450 }
451 else {
452 _schedulerEngine.unschedule(groupName);
453 }
454 }
455 finally {
456 _readLock.unlock();
457 }
458
459 setClusterableThreadLocal(groupName);
460 }
461
462 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
463 @Override
464 public void unschedule(String jobName, String groupName)
465 throws SchedulerException {
466
467 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
468
469 _readLock.lock();
470
471 try {
472 if (memoryClusteredSlaveJob) {
473 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
474 }
475 else {
476 _schedulerEngine.unschedule(jobName, groupName);
477 }
478 }
479 finally {
480 _readLock.unlock();
481 }
482
483 setClusterableThreadLocal(groupName);
484 }
485
486 @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
487 @Override
488 public void update(Trigger trigger) throws SchedulerException {
489 String jobName = trigger.getJobName();
490 String groupName = trigger.getGroupName();
491
492 boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
493
494 _readLock.lock();
495
496 try {
497 if (memoryClusteredSlaveJob) {
498 boolean updated = false;
499
500 for (ObjectValuePair<SchedulerResponse, TriggerState>
501 memoryClusteredJob : _memoryClusteredJobs.values()) {
502
503 SchedulerResponse schedulerResponse =
504 memoryClusteredJob.getKey();
505
506 if (jobName.equals(schedulerResponse.getJobName()) &&
507 groupName.equals(schedulerResponse.getGroupName())) {
508
509 schedulerResponse.setTrigger(trigger);
510
511 updated = true;
512
513 break;
514 }
515 }
516
517 if (!updated) {
518 throw new SchedulerException(
519 "Unable to update trigger for memory clustered job");
520 }
521 }
522 else {
523 _schedulerEngine.update(trigger);
524 }
525 }
526 finally {
527 _readLock.unlock();
528 }
529
530 setClusterableThreadLocal(groupName);
531 }
532
533 @Override
534 public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
535 getMasterAddressString(false);
536
537 return null;
538 }
539
540 protected <T> T callMaster(
541 String masterAddressString, MethodKey methodKey,
542 Object... arguments)
543 throws SchedulerException {
544
545 MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
546
547 Address address = (Address)getDeserializedObject(masterAddressString);
548
549 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
550 methodHandler, address);
551
552 try {
553 FutureClusterResponses futureClusterResponses =
554 ClusterExecutorUtil.execute(clusterRequest);
555
556 ClusterNodeResponses clusterNodeResponses =
557 futureClusterResponses.get(20, TimeUnit.SECONDS);
558
559 ClusterNodeResponse clusterNodeResponse =
560 clusterNodeResponses.getClusterResponse(address);
561
562 return (T)clusterNodeResponse.getResult();
563 }
564 catch (Exception e) {
565 throw new SchedulerException(
566 "Unable to load scheduled jobs from cluster node " +
567 address.getDescription(),
568 e);
569 }
570 }
571
572 protected Object getDeserializedObject(String string)
573 throws SchedulerException {
574
575 byte[] bytes = Base64.decode(string);
576
577 UnsyncByteArrayInputStream byteArrayInputStream =
578 new UnsyncByteArrayInputStream(bytes);
579
580 ObjectInputStream objectInputStream = null;
581
582 try {
583 objectInputStream = new ObjectInputStream(byteArrayInputStream);
584
585 Object object = objectInputStream.readObject();
586
587 return object;
588 }
589 catch (Exception e) {
590 throw new SchedulerException(
591 "Unable to deserialize object from " + string, e);
592 }
593 finally {
594 try {
595 objectInputStream.close();
596 }
597 catch (Exception e) {
598 }
599 }
600 }
601
602 protected String getFullName(String jobName, String groupName) {
603 return groupName.concat(StringPool.PERIOD).concat(jobName);
604 }
605
606 protected String getMasterAddressString(boolean asynchronous)
607 throws SchedulerException {
608
609 String owner = null;
610
611 Lock lock = null;
612
613 while (true) {
614 try {
615 if (owner == null) {
616 lock = LockLocalServiceUtil.lock(
617 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
618 _localClusterNodeAddress,
619 PropsValues.
620 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
621 }
622 else {
623 lock = LockLocalServiceUtil.lock(
624 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
625 _localClusterNodeAddress,
626 PropsValues.
627 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
628 }
629
630 Address address = (Address)getDeserializedObject(
631 lock.getOwner());
632
633 if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
634 break;
635 }
636 else {
637 owner = lock.getOwner();
638 }
639 }
640 catch (Exception e) {
641 if (_log.isWarnEnabled()) {
642 _log.warn(
643 "Unable to obtain memory scheduler cluster lock. " +
644 "Trying again.");
645 }
646 }
647 }
648
649 boolean master = _localClusterNodeAddress.equals(lock.getOwner());
650
651 if (master == _master) {
652 return lock.getOwner();
653 }
654
655 if (master) {
656 slaveToMaster();
657 }
658 else {
659 masterToSlave(lock.getOwner(), asynchronous);
660 }
661
662 return lock.getOwner();
663 }
664
665 protected String getSerializedString(Object object) throws Exception {
666 UnsyncByteArrayOutputStream byteArrayOutputStream =
667 new UnsyncByteArrayOutputStream();
668
669 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
670 byteArrayOutputStream);
671
672 objectOutputStream.writeObject(object);
673 objectOutputStream.close();
674
675 byte[] bytes = byteArrayOutputStream.toByteArray();
676
677 return Base64.encode(bytes);
678 }
679
680 protected void initMemoryClusteredJobs(
681 List<SchedulerResponse> schedulerResponses)
682 throws Exception {
683
684 for (SchedulerResponse schedulerResponse : schedulerResponses) {
685 Trigger oldTrigger = schedulerResponse.getTrigger();
686
687 String jobName = schedulerResponse.getJobName();
688 String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
689 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
690
691 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
692 oldTrigger.getTriggerType(), jobName, groupName,
693 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
694 oldTrigger.getTriggerContent());
695
696 schedulerResponse.setTrigger(newTrigger);
697
698 TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
699 schedulerResponse);
700
701 Message message = schedulerResponse.getMessage();
702
703 message.remove(JOB_STATE);
704
705 _memoryClusteredJobs.put(
706 getFullName(jobName, groupName),
707 new ObjectValuePair<SchedulerResponse, TriggerState>(
708 schedulerResponse, triggerState));
709 }
710 }
711
712 protected boolean isMemoryClusteredSlaveJob(String groupName)
713 throws SchedulerException {
714
715 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
716 groupName);
717
718 StorageType storageType = objectValuePair.getValue();
719
720 if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
721 return false;
722 }
723
724 String masterAddressString = getMasterAddressString(false);
725
726 if (_localClusterNodeAddress.equals(masterAddressString)) {
727 return false;
728 }
729
730 return true;
731 }
732
733 protected void masterToSlave(
734 String masterAddressString, boolean asynchronous)
735 throws SchedulerException {
736
737 if (asynchronous) {
738 MethodHandler methodHandler = new MethodHandler(
739 _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
740
741 Address address = (Address)getDeserializedObject(
742 masterAddressString);
743
744 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
745 methodHandler, address);
746
747 try {
748 ClusterExecutorUtil.execute(
749 clusterRequest,
750 new MemorySchedulerClusterResponseCallback(address), 20,
751 TimeUnit.SECONDS);
752
753 return;
754 }
755 catch (Exception e) {
756 throw new SchedulerException(
757 "Unable to load scheduled jobs from cluster node " +
758 address.getDescription(),
759 e);
760 }
761 }
762
763 List<SchedulerResponse> schedulerResponses = callMaster(
764 masterAddressString, _getScheduledJobsMethodKey3,
765 StorageType.MEMORY_CLUSTERED);
766
767 _doMasterToSlave(schedulerResponses);
768 }
769
770 protected void removeMemoryClusteredJobs(String groupName) {
771 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
772 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
773
774 Iterator
775 <Map.Entry<String,
776 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
777 memoryClusteredJobs.iterator();
778
779 while (itr.hasNext()) {
780 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
781 entry = itr.next();
782
783 ObjectValuePair<SchedulerResponse, TriggerState>
784 memoryClusteredJob = entry.getValue();
785
786 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
787
788 if (groupName.equals(schedulerResponse.getGroupName())) {
789 itr.remove();
790 }
791 }
792 }
793
794 protected ObjectValuePair<String, StorageType> resolveGroupName(
795 String groupName) {
796
797 int index = groupName.indexOf(CharPool.POUND);
798
799 String storageTypeString = groupName.substring(0, index);
800
801 StorageType storageType = StorageType.valueOf(storageTypeString);
802
803 String orginalGroupName = groupName.substring(index + 1);
804
805 return new ObjectValuePair<String, StorageType>(
806 orginalGroupName, storageType);
807 }
808
809 protected void setClusterableThreadLocal(String groupName) {
810 ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
811 groupName);
812
813 ClusterableContextThreadLocal.putThreadLocalContext(
814 STORAGE_TYPE, objectValuePair.getValue());
815 ClusterableContextThreadLocal.putThreadLocalContext(
816 _PORTAL_READY, _portalReady);
817
818 boolean pluginReady = true;
819
820 if (PluginContextLifecycleThreadLocal.isInitializing() ||
821 PluginContextLifecycleThreadLocal.isDestroying()) {
822
823 pluginReady = false;
824 }
825
826 ClusterableContextThreadLocal.putThreadLocalContext(
827 _PLUGIN_READY, pluginReady);
828 }
829
830 protected void slaveToMaster() throws SchedulerException {
831 boolean forceSync = ProxyModeThreadLocal.isForceSync();
832
833 ProxyModeThreadLocal.setForceSync(true);
834
835 _writeLock.lock();
836
837 try {
838 for (ObjectValuePair<SchedulerResponse, TriggerState>
839 memoryClusteredJob : _memoryClusteredJobs.values()) {
840
841 SchedulerResponse schedulerResponse =
842 memoryClusteredJob.getKey();
843
844 _schedulerEngine.schedule(
845 schedulerResponse.getTrigger(),
846 schedulerResponse.getDescription(),
847 schedulerResponse.getDestinationName(),
848 schedulerResponse.getMessage());
849
850 TriggerState triggerState = memoryClusteredJob.getValue();
851
852 if (triggerState.equals(TriggerState.PAUSED)) {
853 _schedulerEngine.pause(
854 schedulerResponse.getJobName(),
855 schedulerResponse.getGroupName());
856 }
857 }
858
859 _memoryClusteredJobs.clear();
860 }
861 finally {
862 ProxyModeThreadLocal.setForceSync(forceSync);
863
864 _master = true;
865
866 _writeLock.unlock();
867 }
868 }
869
870 protected void updateMemoryClusteredJob(
871 String jobName, String groupName, TriggerState triggerState) {
872
873 ObjectValuePair<SchedulerResponse, TriggerState>
874 memoryClusteredJob = _memoryClusteredJobs.get(
875 getFullName(jobName, groupName));
876
877 if (memoryClusteredJob != null) {
878 memoryClusteredJob.setValue(triggerState);
879 }
880 }
881
882 protected void updateMemoryClusteredJobs(
883 String groupName, TriggerState triggerState) {
884
885 for (ObjectValuePair<SchedulerResponse, TriggerState>
886 memoryClusteredJob : _memoryClusteredJobs.values()) {
887
888 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
889
890 if (groupName.equals(schedulerResponse.getGroupName())) {
891 memoryClusteredJob.setValue(triggerState);
892 }
893 }
894 }
895
896 @BeanReference(
897 name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
898 protected SchedulerEngine schedulerEngine;
899
900 private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
901 throws SchedulerException {
902
903 _writeLock.lock();
904
905 try {
906 for (SchedulerResponse schedulerResponse :
907 _schedulerEngine.getScheduledJobs()) {
908
909 if (StorageType.MEMORY_CLUSTERED ==
910 schedulerResponse.getStorageType()) {
911
912 String groupName = StorageType.MEMORY_CLUSTERED.toString();
913
914 groupName = groupName.concat(StringPool.POUND).concat(
915 schedulerResponse.getGroupName());
916
917 _schedulerEngine.delete(
918 schedulerResponse.getJobName(), groupName);
919 }
920 }
921
922 initMemoryClusteredJobs(schedulerResponses);
923
924 if (_log.isInfoEnabled()) {
925 _log.info("Switched current node from master to slave");
926 }
927 }
928 catch (Exception e) {
929 throw new SchedulerException(e);
930 }
931 finally {
932 _master = false;
933
934 _writeLock.unlock();
935 }
936 }
937
938 private static final String _LOCK_CLASS_NAME =
939 SchedulerEngine.class.getName();
940
941 private static final String _PLUGIN_READY = "plugin.ready";
942
943 private static final String _PORTAL_READY = "portal.ready";
944
945 private static Log _log = LogFactoryUtil.getLog(
946 ClusterSchedulerEngine.class);
947
948 private static MethodKey _getScheduledJobMethodKey = new MethodKey(
949 SchedulerEngineHelperUtil.class.getName(), "getScheduledJob",
950 String.class, String.class, StorageType.class);
951 private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
952 SchedulerEngineHelperUtil.class.getName(), "getScheduledJobs");
953 private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
954 SchedulerEngineHelperUtil.class.getName(), "getScheduledJobs",
955 String.class, StorageType.class);
956 private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
957 SchedulerEngineHelperUtil.class.getName(), "getScheduledJobs",
958 StorageType.class);
959
960 private String _beanIdentifier;
961 private ClusterEventListener _clusterEventListener;
962 private volatile String _localClusterNodeAddress;
963 private volatile boolean _master;
964 private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
965 _memoryClusteredJobs = new ConcurrentHashMap
966 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
967 private boolean _portalReady;
968 private java.util.concurrent.locks.Lock _readLock;
969 private SchedulerEngine _schedulerEngine;
970 private java.util.concurrent.locks.Lock _writeLock;
971
972 private static class SchedulerClusterInvokeAcceptor
973 implements ClusterInvokeAcceptor {
974
975 @Override
976 public boolean accept(Map<String, Serializable> context) {
977 if (ClusterInvokeThreadLocal.isEnabled()) {
978 return true;
979 }
980
981 StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
982 boolean portalReady = (Boolean)context.get(_PORTAL_READY);
983 boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
984
985 if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
986 !pluginReady) {
987
988 return false;
989 }
990
991 return true;
992 }
993
994 }
995
996 private class MemorySchedulerClusterEventListener
997 implements ClusterEventListener {
998
999 @Override
1000 public void processClusterEvent(ClusterEvent clusterEvent) {
1001 try {
1002 getMasterAddressString(true);
1003 }
1004 catch (Exception e) {
1005 _log.error("Unable to update memory scheduler cluster lock", e);
1006 }
1007 }
1008
1009 }
1010
1011 private class MemorySchedulerClusterResponseCallback
1012 extends BaseClusterResponseCallback {
1013
1014 public MemorySchedulerClusterResponseCallback(Address address) {
1015 _address = address;
1016 }
1017
1018 @Override
1019 public void callback(ClusterNodeResponses clusterNodeResponses) {
1020 try {
1021 ClusterNodeResponse clusterNodeResponse =
1022 clusterNodeResponses.getClusterResponse(_address);
1023
1024 List<SchedulerResponse> schedulerResponses =
1025 (List<SchedulerResponse>)clusterNodeResponse.getResult();
1026
1027 _doMasterToSlave(schedulerResponses);
1028 }
1029 catch (Exception e) {
1030 _log.error(
1031 "Unable to load memory clustered jobs from cluster node " +
1032 _address.getDescription(),
1033 e);
1034 }
1035 }
1036
1037 @Override
1038 public void processTimeoutException(TimeoutException timeoutException) {
1039 _log.error(
1040 "Unable to load memory clustered jobs from cluster node " +
1041 _address.getDescription(),
1042 timeoutException);
1043 }
1044
1045 private Address _address;
1046
1047 }
1048
1049 }