001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.cluster.ClusterableContextThreadLocal;
018    import com.liferay.portal.kernel.bean.BeanReference;
019    import com.liferay.portal.kernel.bean.IdentifiableBean;
020    import com.liferay.portal.kernel.cluster.Address;
021    import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
022    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
023    import com.liferay.portal.kernel.cluster.ClusterEvent;
024    import com.liferay.portal.kernel.cluster.ClusterEventListener;
025    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
026    import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
027    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
028    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
029    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
030    import com.liferay.portal.kernel.cluster.ClusterRequest;
031    import com.liferay.portal.kernel.cluster.Clusterable;
032    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
033    import com.liferay.portal.kernel.log.Log;
034    import com.liferay.portal.kernel.log.LogFactoryUtil;
035    import com.liferay.portal.kernel.messaging.Message;
036    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
037    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
038    import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
039    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
040    import com.liferay.portal.kernel.scheduler.SchedulerException;
041    import com.liferay.portal.kernel.scheduler.StorageType;
042    import com.liferay.portal.kernel.scheduler.Trigger;
043    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
044    import com.liferay.portal.kernel.scheduler.TriggerState;
045    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
046    import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
047    import com.liferay.portal.kernel.util.CharPool;
048    import com.liferay.portal.kernel.util.MethodHandler;
049    import com.liferay.portal.kernel.util.MethodKey;
050    import com.liferay.portal.kernel.util.ObjectValuePair;
051    import com.liferay.portal.kernel.util.StringPool;
052    import com.liferay.portal.kernel.util.Validator;
053    import com.liferay.portal.model.Lock;
054    import com.liferay.portal.service.LockLocalServiceUtil;
055    import com.liferay.portal.util.PropsValues;
056    
057    import java.io.Serializable;
058    
059    import java.util.Iterator;
060    import java.util.List;
061    import java.util.Map;
062    import java.util.Set;
063    import java.util.concurrent.ConcurrentHashMap;
064    import java.util.concurrent.TimeUnit;
065    import java.util.concurrent.TimeoutException;
066    import java.util.concurrent.locks.ReadWriteLock;
067    import java.util.concurrent.locks.ReentrantReadWriteLock;
068    
069    /**
070     * @author Tina Tian
071     */
072    public class ClusterSchedulerEngine
073            implements IdentifiableBean, SchedulerEngine,
074                               SchedulerEngineClusterManager {
075    
076            public static SchedulerEngine createClusterSchedulerEngine(
077                    SchedulerEngine schedulerEngine) {
078    
079                    if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
080                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
081                    }
082    
083                    return schedulerEngine;
084            }
085    
086            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
087                    _schedulerEngine = schedulerEngine;
088            }
089    
090            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
091            @Override
092            public void delete(String groupName) throws SchedulerException {
093                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
094    
095                    _readLock.lock();
096    
097                    try {
098                            if (memoryClusteredSlaveJob) {
099                                    removeMemoryClusteredJobs(groupName);
100                            }
101                            else {
102                                    _schedulerEngine.delete(groupName);
103                            }
104                    }
105                    finally {
106                            _readLock.unlock();
107                    }
108    
109                    setClusterableThreadLocal(groupName);
110            }
111    
112            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
113            @Override
114            public void delete(String jobName, String groupName)
115                    throws SchedulerException {
116    
117                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
118    
119                    _readLock.lock();
120    
121                    try {
122                            if (memoryClusteredSlaveJob) {
123                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
124                            }
125                            else {
126                                    _schedulerEngine.delete(jobName, groupName);
127                            }
128                    }
129                    finally {
130                            _readLock.unlock();
131                    }
132    
133                    setClusterableThreadLocal(groupName);
134            }
135    
136            @Override
137            public String getBeanIdentifier() {
138                    return _beanIdentifier;
139            }
140    
141            @Override
142            public SchedulerResponse getScheduledJob(String jobName, String groupName)
143                    throws SchedulerException {
144    
145                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
146                            groupName);
147    
148                    StorageType storageType = objectValuePair.getValue();
149    
150                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
151                            String masterAddressString = getMasterAddressString(false);
152    
153                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
154                                    return callMaster(
155                                            masterAddressString, _getScheduledJobMethodKey, jobName,
156                                            objectValuePair.getKey(), storageType);
157                            }
158                    }
159    
160                    _readLock.lock();
161    
162                    try {
163                            return _schedulerEngine.getScheduledJob(jobName, groupName);
164                    }
165                    finally {
166                            _readLock.unlock();
167                    }
168            }
169    
170            @Override
171            public List<SchedulerResponse> getScheduledJobs()
172                    throws SchedulerException {
173    
174                    String masterAddressString = getMasterAddressString(false);
175    
176                    if (!_localClusterNodeAddress.equals(masterAddressString)) {
177                            return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
178                    }
179    
180                    _readLock.lock();
181    
182                    try {
183                            return _schedulerEngine.getScheduledJobs();
184                    }
185                    finally {
186                            _readLock.unlock();
187                    }
188            }
189    
190            @Override
191            public List<SchedulerResponse> getScheduledJobs(String groupName)
192                    throws SchedulerException {
193    
194                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
195                            groupName);
196    
197                    StorageType storageType = objectValuePair.getValue();
198    
199                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
200                            String masterAddressString = getMasterAddressString(false);
201    
202                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
203                                    return callMaster(
204                                            masterAddressString, _getScheduledJobsMethodKey2,
205                                            objectValuePair.getKey(), storageType);
206                            }
207                    }
208    
209                    _readLock.lock();
210    
211                    try {
212                            return _schedulerEngine.getScheduledJobs(groupName);
213                    }
214                    finally {
215                            _readLock.unlock();
216                    }
217            }
218    
219            @Override
220            public void initialize() throws SchedulerException {
221                    try {
222                            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
223    
224                            _readLock = readWriteLock.readLock();
225                            _writeLock = readWriteLock.writeLock();
226    
227                            _localClusterNodeAddress = AddressSerializerUtil.serialize(
228                                    ClusterExecutorUtil.getLocalClusterNodeAddress());
229    
230                            _clusterEventListener = new MemorySchedulerClusterEventListener();
231    
232                            ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
233    
234                            String masterAddressString = getMasterAddressString(false);
235    
236                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
237                                    List<SchedulerResponse> schedulerResponses = callMaster(
238                                            masterAddressString, _getScheduledJobsMethodKey3,
239                                            StorageType.MEMORY_CLUSTERED);
240    
241                                    initMemoryClusteredJobs(schedulerResponses);
242                            }
243                    }
244                    catch (Exception e) {
245                            throw new SchedulerException("Unable to initialize scheduler", e);
246                    }
247            }
248    
249            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
250            @Override
251            public void pause(String groupName) throws SchedulerException {
252                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
253    
254                    _readLock.lock();
255    
256                    try {
257                            if (memoryClusteredSlaveJob) {
258                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
259                            }
260                            else {
261                                    _schedulerEngine.pause(groupName);
262                            }
263                    }
264                    finally {
265                            _readLock.unlock();
266                    }
267    
268                    setClusterableThreadLocal(groupName);
269            }
270    
271            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
272            @Override
273            public void pause(String jobName, String groupName)
274                    throws SchedulerException {
275    
276                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
277    
278                    _readLock.lock();
279    
280                    try {
281                            if (memoryClusteredSlaveJob) {
282                                    updateMemoryClusteredJob(
283                                            jobName, groupName, TriggerState.PAUSED);
284                            }
285                            else {
286                                    _schedulerEngine.pause(jobName, groupName);
287                            }
288                    }
289                    finally {
290                            _readLock.unlock();
291                    }
292    
293                    setClusterableThreadLocal(groupName);
294            }
295    
296            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
297            @Override
298            public void resume(String groupName) throws SchedulerException {
299                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
300    
301                    _readLock.lock();
302    
303                    try {
304                            if (memoryClusteredSlaveJob) {
305                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
306                            }
307                            else {
308                                    _schedulerEngine.resume(groupName);
309                            }
310                    }
311                    finally {
312                            _readLock.unlock();
313                    }
314    
315                    setClusterableThreadLocal(groupName);
316            }
317    
318            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
319            @Override
320            public void resume(String jobName, String groupName)
321                    throws SchedulerException {
322    
323                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
324    
325                    _readLock.lock();
326    
327                    try {
328                            if (memoryClusteredSlaveJob) {
329                                    updateMemoryClusteredJob(
330                                            jobName, groupName, TriggerState.NORMAL);
331                            }
332                            else {
333                                    _schedulerEngine.resume(jobName, groupName);
334                            }
335                    }
336                    finally {
337                            _readLock.unlock();
338                    }
339    
340                    setClusterableThreadLocal(groupName);
341            }
342    
343            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
344            @Override
345            public void schedule(
346                            Trigger trigger, String description, String destinationName,
347                            Message message)
348                    throws SchedulerException {
349    
350                    String groupName = trigger.getGroupName();
351                    String jobName = trigger.getJobName();
352    
353                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
354    
355                    _readLock.lock();
356    
357                    try {
358                            if (memoryClusteredSlaveJob) {
359                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
360    
361                                    schedulerResponse.setDescription(description);
362                                    schedulerResponse.setDestinationName(destinationName);
363                                    schedulerResponse.setGroupName(groupName);
364                                    schedulerResponse.setJobName(jobName);
365                                    schedulerResponse.setMessage(message);
366                                    schedulerResponse.setTrigger(trigger);
367    
368                                    _memoryClusteredJobs.put(
369                                            getFullName(jobName, groupName),
370                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
371                                                    schedulerResponse, TriggerState.NORMAL));
372                            }
373                            else {
374                                    _schedulerEngine.schedule(
375                                            trigger, description, destinationName, message);
376                            }
377                    }
378                    finally {
379                            _readLock.unlock();
380                    }
381    
382                    setClusterableThreadLocal(groupName);
383            }
384    
385            @Override
386            public void setBeanIdentifier(String beanIdentifier) {
387                    _beanIdentifier = beanIdentifier;
388            }
389    
390            @Override
391            public void shutdown() throws SchedulerException {
392                    _portalReady = false;
393    
394                    try {
395                            ClusterExecutorUtil.removeClusterEventListener(
396                                    _clusterEventListener);
397    
398                            LockLocalServiceUtil.unlock(
399                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
400                    }
401                    catch (Exception e) {
402                            throw new SchedulerException("Unable to shutdown scheduler", e);
403                    }
404    
405                    _schedulerEngine.shutdown();
406            }
407    
408            @Override
409            public void start() throws SchedulerException {
410                    _schedulerEngine.start();
411    
412                    _portalReady = true;
413            }
414    
415            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
416            @Override
417            public void suppressError(String jobName, String groupName)
418                    throws SchedulerException {
419    
420                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
421    
422                    if (!memoryClusteredSlaveJob) {
423                            _readLock.lock();
424    
425                            try {
426                                    _schedulerEngine.suppressError(jobName, groupName);
427                            }
428                            finally {
429                                    _readLock.unlock();
430                            }
431                    }
432    
433                    setClusterableThreadLocal(groupName);
434            }
435    
436            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
437            @Override
438            public void unschedule(String groupName) throws SchedulerException {
439                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
440    
441                    _readLock.lock();
442    
443                    try {
444                            if (memoryClusteredSlaveJob) {
445                                    removeMemoryClusteredJobs(groupName);
446                            }
447                            else {
448                                    _schedulerEngine.unschedule(groupName);
449                            }
450                    }
451                    finally {
452                            _readLock.unlock();
453                    }
454    
455                    setClusterableThreadLocal(groupName);
456            }
457    
458            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
459            @Override
460            public void unschedule(String jobName, String groupName)
461                    throws SchedulerException {
462    
463                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
464    
465                    _readLock.lock();
466    
467                    try {
468                            if (memoryClusteredSlaveJob) {
469                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
470                            }
471                            else {
472                                    _schedulerEngine.unschedule(jobName, groupName);
473                            }
474                    }
475                    finally {
476                            _readLock.unlock();
477                    }
478    
479                    setClusterableThreadLocal(groupName);
480            }
481    
482            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
483            @Override
484            public void update(Trigger trigger) throws SchedulerException {
485                    String jobName = trigger.getJobName();
486                    String groupName = trigger.getGroupName();
487    
488                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
489    
490                    _readLock.lock();
491    
492                    try {
493                            if (memoryClusteredSlaveJob) {
494                                    boolean updated = false;
495    
496                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
497                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
498    
499                                            SchedulerResponse schedulerResponse =
500                                                    memoryClusteredJob.getKey();
501    
502                                            if (jobName.equals(schedulerResponse.getJobName()) &&
503                                                    groupName.equals(schedulerResponse.getGroupName())) {
504    
505                                                    schedulerResponse.setTrigger(trigger);
506    
507                                                    updated = true;
508    
509                                                    break;
510                                            }
511                                    }
512    
513                                    if (!updated) {
514                                            throw new SchedulerException(
515                                                    "Unable to update trigger for memory clustered job");
516                                    }
517                            }
518                            else {
519                                    _schedulerEngine.update(trigger);
520                            }
521                    }
522                    finally {
523                            _readLock.unlock();
524                    }
525    
526                    setClusterableThreadLocal(groupName);
527            }
528    
529            @Override
530            public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
531                    getMasterAddressString(false);
532    
533                    return null;
534            }
535    
536            protected <T> T callMaster(
537                            String masterAddressString, MethodKey methodKey,
538                            Object... arguments)
539                    throws SchedulerException {
540    
541                    MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
542    
543                    Address address = AddressSerializerUtil.deserialize(
544                            masterAddressString);
545    
546                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
547                            methodHandler, address);
548    
549                    try {
550                            FutureClusterResponses futureClusterResponses =
551                                    ClusterExecutorUtil.execute(clusterRequest);
552    
553                            ClusterNodeResponses clusterNodeResponses =
554                                    futureClusterResponses.get(20, TimeUnit.SECONDS);
555    
556                            ClusterNodeResponse clusterNodeResponse =
557                                    clusterNodeResponses.getClusterResponse(address);
558    
559                            return (T)clusterNodeResponse.getResult();
560                    }
561                    catch (Exception e) {
562                            throw new SchedulerException(
563                                    "Unable to load scheduled jobs from cluster node " +
564                                            address.getDescription(),
565                                    e);
566                    }
567            }
568    
569            protected String getFullName(String jobName, String groupName) {
570                    return groupName.concat(StringPool.PERIOD).concat(jobName);
571            }
572    
573            protected String getMasterAddressString(boolean asynchronous)
574                    throws SchedulerException {
575    
576                    String owner = null;
577    
578                    while (true) {
579                            try {
580                                    Lock lock = null;
581    
582                                    if (owner == null) {
583                                            lock = LockLocalServiceUtil.lock(
584                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
585                                                    _localClusterNodeAddress);
586                                    }
587                                    else {
588                                            lock = LockLocalServiceUtil.lock(
589                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
590                                                    _localClusterNodeAddress);
591                                    }
592    
593                                    owner = lock.getOwner();
594    
595                                    Address address = AddressSerializerUtil.deserialize(owner);
596    
597                                    if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
598                                            break;
599                                    }
600                            }
601                            catch (Exception e) {
602                                    if (_log.isWarnEnabled()) {
603                                            _log.warn(
604                                                    "Could not acquire memory scheduler cluster lock", e);
605                                    }
606                            }
607    
608                            if (_log.isInfoEnabled()) {
609                                    _log.info("Could not acquire scheduler cluster lock, retrying");
610    
611                                    if (Validator.isNotNull(owner)) {
612                                            _log.info("Lock currently held by " + owner);
613                                    }
614                            }
615                    }
616    
617                    boolean master = _localClusterNodeAddress.equals(owner);
618    
619                    if (master == _master) {
620                            return owner;
621                    }
622    
623                    if (master) {
624                            slaveToMaster();
625                    }
626                    else {
627                            masterToSlave(owner, asynchronous);
628                    }
629    
630                    return owner;
631            }
632    
633            protected void initMemoryClusteredJobs(
634                            List<SchedulerResponse> schedulerResponses)
635                    throws Exception {
636    
637                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
638                            Trigger oldTrigger = schedulerResponse.getTrigger();
639    
640                            String jobName = schedulerResponse.getJobName();
641                            String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
642                                    schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
643    
644                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
645                                    oldTrigger.getTriggerType(), jobName, groupName,
646                                    oldTrigger.getStartDate(), oldTrigger.getEndDate(),
647                                    oldTrigger.getTriggerContent());
648    
649                            schedulerResponse.setTrigger(newTrigger);
650    
651                            TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
652                                    schedulerResponse);
653    
654                            Message message = schedulerResponse.getMessage();
655    
656                            message.remove(JOB_STATE);
657    
658                            _memoryClusteredJobs.put(
659                                    getFullName(jobName, groupName),
660                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
661                                            schedulerResponse, triggerState));
662                    }
663            }
664    
665            protected boolean isMemoryClusteredSlaveJob(String groupName)
666                    throws SchedulerException {
667    
668                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
669                            groupName);
670    
671                    StorageType storageType = objectValuePair.getValue();
672    
673                    if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
674                            return false;
675                    }
676    
677                    String masterAddressString = getMasterAddressString(false);
678    
679                    if (_localClusterNodeAddress.equals(masterAddressString)) {
680                            return false;
681                    }
682    
683                    return true;
684            }
685    
686            protected void masterToSlave(
687                            String masterAddressString, boolean asynchronous)
688                    throws SchedulerException {
689    
690                    if (asynchronous) {
691                            MethodHandler methodHandler = new MethodHandler(
692                                    _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
693    
694                            Address address = AddressSerializerUtil.deserialize(
695                                    masterAddressString);
696    
697                            ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
698                                    methodHandler, address);
699    
700                            try {
701                                    ClusterExecutorUtil.execute(
702                                            clusterRequest,
703                                            new MemorySchedulerClusterResponseCallback(address), 20,
704                                            TimeUnit.SECONDS);
705    
706                                    return;
707                            }
708                            catch (Exception e) {
709                                    throw new SchedulerException(
710                                            "Unable to load scheduled jobs from cluster node " +
711                                                    address.getDescription(),
712                                            e);
713                            }
714                    }
715    
716                    List<SchedulerResponse> schedulerResponses = callMaster(
717                            masterAddressString, _getScheduledJobsMethodKey3,
718                            StorageType.MEMORY_CLUSTERED);
719    
720                    _doMasterToSlave(schedulerResponses);
721            }
722    
723            protected void removeMemoryClusteredJobs(String groupName) {
724                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
725                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
726    
727                    Iterator
728                            <Map.Entry<String,
729                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
730                                            memoryClusteredJobs.iterator();
731    
732                    while (itr.hasNext()) {
733                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
734                                    entry = itr.next();
735    
736                            ObjectValuePair<SchedulerResponse, TriggerState>
737                                    memoryClusteredJob = entry.getValue();
738    
739                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
740    
741                            if (groupName.equals(schedulerResponse.getGroupName())) {
742                                    itr.remove();
743                            }
744                    }
745            }
746    
747            protected ObjectValuePair<String, StorageType> resolveGroupName(
748                    String groupName) {
749    
750                    int index = groupName.indexOf(CharPool.POUND);
751    
752                    String storageTypeString = groupName.substring(0, index);
753    
754                    StorageType storageType = StorageType.valueOf(storageTypeString);
755    
756                    String orginalGroupName = groupName.substring(index + 1);
757    
758                    return new ObjectValuePair<String, StorageType>(
759                            orginalGroupName, storageType);
760            }
761    
762            protected void setClusterableThreadLocal(String groupName) {
763                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
764                            groupName);
765    
766                    ClusterableContextThreadLocal.putThreadLocalContext(
767                            STORAGE_TYPE, objectValuePair.getValue());
768                    ClusterableContextThreadLocal.putThreadLocalContext(
769                            _PORTAL_READY, _portalReady);
770    
771                    boolean pluginReady = true;
772    
773                    if (PluginContextLifecycleThreadLocal.isInitializing() ||
774                            PluginContextLifecycleThreadLocal.isDestroying()) {
775    
776                            pluginReady = false;
777                    }
778    
779                    ClusterableContextThreadLocal.putThreadLocalContext(
780                            _PLUGIN_READY, pluginReady);
781            }
782    
783            protected void slaveToMaster() throws SchedulerException {
784                    boolean forceSync = ProxyModeThreadLocal.isForceSync();
785    
786                    ProxyModeThreadLocal.setForceSync(true);
787    
788                    _writeLock.lock();
789    
790                    try {
791                            for (ObjectValuePair<SchedulerResponse, TriggerState>
792                                            memoryClusteredJob : _memoryClusteredJobs.values()) {
793    
794                                    SchedulerResponse schedulerResponse =
795                                            memoryClusteredJob.getKey();
796    
797                                    _schedulerEngine.schedule(
798                                            schedulerResponse.getTrigger(),
799                                            schedulerResponse.getDescription(),
800                                            schedulerResponse.getDestinationName(),
801                                            schedulerResponse.getMessage());
802    
803                                    TriggerState triggerState = memoryClusteredJob.getValue();
804    
805                                    if (triggerState.equals(TriggerState.PAUSED)) {
806                                            _schedulerEngine.pause(
807                                                    schedulerResponse.getJobName(),
808                                                    schedulerResponse.getGroupName());
809                                    }
810                            }
811    
812                            _memoryClusteredJobs.clear();
813                    }
814                    finally {
815                            ProxyModeThreadLocal.setForceSync(forceSync);
816    
817                            _master = true;
818    
819                            _writeLock.unlock();
820                    }
821            }
822    
823            protected void updateMemoryClusteredJob(
824                    String jobName, String groupName, TriggerState triggerState) {
825    
826                    ObjectValuePair<SchedulerResponse, TriggerState>
827                            memoryClusteredJob = _memoryClusteredJobs.get(
828                                    getFullName(jobName, groupName));
829    
830                    if (memoryClusteredJob != null) {
831                            memoryClusteredJob.setValue(triggerState);
832                    }
833            }
834    
835            protected void updateMemoryClusteredJobs(
836                    String groupName, TriggerState triggerState) {
837    
838                    for (ObjectValuePair<SchedulerResponse, TriggerState>
839                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
840    
841                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
842    
843                            if (groupName.equals(schedulerResponse.getGroupName())) {
844                                    memoryClusteredJob.setValue(triggerState);
845                            }
846                    }
847            }
848    
849            @BeanReference(
850                    name = "com.liferay.portal.scheduler.ClusterSchedulerEngineService")
851            protected SchedulerEngine schedulerEngine;
852    
853            private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
854                    throws SchedulerException {
855    
856                    _writeLock.lock();
857    
858                    try {
859                            for (SchedulerResponse schedulerResponse :
860                                            _schedulerEngine.getScheduledJobs()) {
861    
862                                    if (StorageType.MEMORY_CLUSTERED ==
863                                                    schedulerResponse.getStorageType()) {
864    
865                                            String groupName = StorageType.MEMORY_CLUSTERED.toString();
866    
867                                            groupName = groupName.concat(StringPool.POUND).concat(
868                                                    schedulerResponse.getGroupName());
869    
870                                            _schedulerEngine.delete(
871                                                    schedulerResponse.getJobName(), groupName);
872                                    }
873                            }
874    
875                            initMemoryClusteredJobs(schedulerResponses);
876    
877                            if (_log.isInfoEnabled()) {
878                                    _log.info("Switched current node from master to slave");
879                            }
880                    }
881                    catch (Exception e) {
882                            throw new SchedulerException(e);
883                    }
884                    finally {
885                            _master = false;
886    
887                            _writeLock.unlock();
888                    }
889            }
890    
891            private static final String _LOCK_CLASS_NAME =
892                    SchedulerEngine.class.getName();
893    
894            private static final String _PLUGIN_READY = "plugin.ready";
895    
896            private static final String _PORTAL_READY = "portal.ready";
897    
898            private static Log _log = LogFactoryUtil.getLog(
899                    ClusterSchedulerEngine.class);
900    
901            private static MethodKey _getScheduledJobMethodKey = new MethodKey(
902                    SchedulerEngineHelperUtil.class, "getScheduledJob", String.class,
903                    String.class, StorageType.class);
904            private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
905                    SchedulerEngineHelperUtil.class, "getScheduledJobs");
906            private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
907                    SchedulerEngineHelperUtil.class, "getScheduledJobs", String.class,
908                    StorageType.class);
909            private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
910                    SchedulerEngineHelperUtil.class, "getScheduledJobs", StorageType.class);
911    
912            private String _beanIdentifier;
913            private ClusterEventListener _clusterEventListener;
914            private volatile String _localClusterNodeAddress;
915            private volatile boolean _master;
916            private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
917                    _memoryClusteredJobs = new ConcurrentHashMap
918                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
919            private boolean _portalReady;
920            private java.util.concurrent.locks.Lock _readLock;
921            private SchedulerEngine _schedulerEngine;
922            private java.util.concurrent.locks.Lock _writeLock;
923    
924            private static class SchedulerClusterInvokeAcceptor
925                    implements ClusterInvokeAcceptor {
926    
927                    @Override
928                    public boolean accept(Map<String, Serializable> context) {
929                            if (ClusterInvokeThreadLocal.isEnabled()) {
930                                    return true;
931                            }
932    
933                            StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
934                            boolean portalReady = (Boolean)context.get(_PORTAL_READY);
935                            boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
936    
937                            if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
938                                    !pluginReady) {
939    
940                                    return false;
941                            }
942    
943                            return true;
944                    }
945    
946            }
947    
948            private class MemorySchedulerClusterEventListener
949                    implements ClusterEventListener {
950    
951                    @Override
952                    public void processClusterEvent(ClusterEvent clusterEvent) {
953                            try {
954                                    getMasterAddressString(true);
955                            }
956                            catch (Exception e) {
957                                    _log.error("Unable to update memory scheduler cluster lock", e);
958                            }
959                    }
960    
961            }
962    
963            private class MemorySchedulerClusterResponseCallback
964                    extends BaseClusterResponseCallback {
965    
966                    public MemorySchedulerClusterResponseCallback(Address address) {
967                            _address = address;
968                    }
969    
970                    @Override
971                    public void callback(ClusterNodeResponses clusterNodeResponses) {
972                            try {
973                                    ClusterNodeResponse clusterNodeResponse =
974                                            clusterNodeResponses.getClusterResponse(_address);
975    
976                                    List<SchedulerResponse> schedulerResponses =
977                                            (List<SchedulerResponse>)clusterNodeResponse.getResult();
978    
979                                    _doMasterToSlave(schedulerResponses);
980                            }
981                            catch (Exception e) {
982                                    _log.error(
983                                            "Unable to load memory clustered jobs from cluster node " +
984                                                    _address.getDescription(),
985                                            e);
986                            }
987                    }
988    
989                    @Override
990                    public void processTimeoutException(TimeoutException timeoutException) {
991                            _log.error(
992                                    "Unable to load memory clustered jobs from cluster node " +
993                                            _address.getDescription(),
994                                    timeoutException);
995                    }
996    
997                    private Address _address;
998    
999            }
1000    
1001    }