001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.cluster.ClusterInvokeThreadLocal;
018    import com.liferay.portal.cluster.ClusterableContextThreadLocal;
019    import com.liferay.portal.kernel.bean.BeanReference;
020    import com.liferay.portal.kernel.bean.IdentifiableBean;
021    import com.liferay.portal.kernel.cluster.Address;
022    import com.liferay.portal.kernel.cluster.BaseClusterResponseCallback;
023    import com.liferay.portal.kernel.cluster.ClusterEvent;
024    import com.liferay.portal.kernel.cluster.ClusterEventListener;
025    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
026    import com.liferay.portal.kernel.cluster.ClusterInvokeAcceptor;
027    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
028    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
029    import com.liferay.portal.kernel.cluster.ClusterRequest;
030    import com.liferay.portal.kernel.cluster.Clusterable;
031    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
032    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
033    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
034    import com.liferay.portal.kernel.log.Log;
035    import com.liferay.portal.kernel.log.LogFactoryUtil;
036    import com.liferay.portal.kernel.messaging.Message;
037    import com.liferay.portal.kernel.messaging.proxy.ProxyModeThreadLocal;
038    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
039    import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
040    import com.liferay.portal.kernel.scheduler.SchedulerEngineHelperUtil;
041    import com.liferay.portal.kernel.scheduler.SchedulerException;
042    import com.liferay.portal.kernel.scheduler.StorageType;
043    import com.liferay.portal.kernel.scheduler.Trigger;
044    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
045    import com.liferay.portal.kernel.scheduler.TriggerState;
046    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
047    import com.liferay.portal.kernel.servlet.PluginContextLifecycleThreadLocal;
048    import com.liferay.portal.kernel.util.Base64;
049    import com.liferay.portal.kernel.util.CharPool;
050    import com.liferay.portal.kernel.util.MethodHandler;
051    import com.liferay.portal.kernel.util.MethodKey;
052    import com.liferay.portal.kernel.util.ObjectValuePair;
053    import com.liferay.portal.kernel.util.StringPool;
054    import com.liferay.portal.model.Lock;
055    import com.liferay.portal.service.LockLocalServiceUtil;
056    import com.liferay.portal.util.PropsValues;
057    
058    import java.io.ObjectInputStream;
059    import java.io.ObjectOutputStream;
060    import java.io.Serializable;
061    
062    import java.util.Iterator;
063    import java.util.List;
064    import java.util.Map;
065    import java.util.Set;
066    import java.util.concurrent.ConcurrentHashMap;
067    import java.util.concurrent.TimeUnit;
068    import java.util.concurrent.TimeoutException;
069    import java.util.concurrent.locks.ReadWriteLock;
070    import java.util.concurrent.locks.ReentrantReadWriteLock;
071    
072    /**
073     * @author Tina Tian
074     */
075    public class ClusterSchedulerEngine
076            implements IdentifiableBean, SchedulerEngine,
077                               SchedulerEngineClusterManager {
078    
079            public static SchedulerEngine createClusterSchedulerEngine(
080                    SchedulerEngine schedulerEngine) {
081    
082                    if (PropsValues.CLUSTER_LINK_ENABLED && PropsValues.SCHEDULER_ENABLED) {
083                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
084                    }
085    
086                    return schedulerEngine;
087            }
088    
089            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
090                    _schedulerEngine = schedulerEngine;
091            }
092    
093            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
094            @Override
095            public void delete(String groupName) throws SchedulerException {
096                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
097    
098                    _readLock.lock();
099    
100                    try {
101                            if (memoryClusteredSlaveJob) {
102                                    removeMemoryClusteredJobs(groupName);
103                            }
104                            else {
105                                    _schedulerEngine.delete(groupName);
106                            }
107                    }
108                    finally {
109                            _readLock.unlock();
110                    }
111    
112                    setClusterableThreadLocal(groupName);
113            }
114    
115            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
116            @Override
117            public void delete(String jobName, String groupName)
118                    throws SchedulerException {
119    
120                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
121    
122                    _readLock.lock();
123    
124                    try {
125                            if (memoryClusteredSlaveJob) {
126                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
127                            }
128                            else {
129                                    _schedulerEngine.delete(jobName, groupName);
130                            }
131                    }
132                    finally {
133                            _readLock.unlock();
134                    }
135    
136                    setClusterableThreadLocal(groupName);
137            }
138    
139            @Override
140            public String getBeanIdentifier() {
141                    return _beanIdentifier;
142            }
143    
144            @Override
145            public SchedulerResponse getScheduledJob(String jobName, String groupName)
146                    throws SchedulerException {
147    
148                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
149                            groupName);
150    
151                    StorageType storageType = objectValuePair.getValue();
152    
153                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
154                            String masterAddressString = getMasterAddressString(false);
155    
156                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
157                                    return (SchedulerResponse)callMaster(
158                                            masterAddressString, _getScheduledJobMethodKey, jobName,
159                                            objectValuePair.getKey(), storageType);
160                            }
161                    }
162    
163                    _readLock.lock();
164    
165                    try {
166                            return _schedulerEngine.getScheduledJob(jobName, groupName);
167                    }
168                    finally {
169                            _readLock.unlock();
170                    }
171            }
172    
173            @Override
174            public List<SchedulerResponse> getScheduledJobs()
175                    throws SchedulerException {
176    
177                    String masterAddressString = getMasterAddressString(false);
178    
179                    if (!_localClusterNodeAddress.equals(masterAddressString)) {
180                            return callMaster(masterAddressString, _getScheduledJobsMethodKey1);
181                    }
182    
183                    _readLock.lock();
184    
185                    try {
186                            return _schedulerEngine.getScheduledJobs();
187                    }
188                    finally {
189                            _readLock.unlock();
190                    }
191            }
192    
193            @Override
194            public List<SchedulerResponse> getScheduledJobs(String groupName)
195                    throws SchedulerException {
196    
197                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
198                            groupName);
199    
200                    StorageType storageType = objectValuePair.getValue();
201    
202                    if (storageType.equals(StorageType.MEMORY_CLUSTERED)) {
203                            String masterAddressString = getMasterAddressString(false);
204    
205                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
206                                    return callMaster(
207                                            masterAddressString, _getScheduledJobsMethodKey2,
208                                            objectValuePair.getKey(), storageType);
209                            }
210                    }
211    
212                    _readLock.lock();
213    
214                    try {
215                            return _schedulerEngine.getScheduledJobs(groupName);
216                    }
217                    finally {
218                            _readLock.unlock();
219                    }
220            }
221    
222            @Override
223            public void initialize() throws SchedulerException {
224                    try {
225                            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
226    
227                            _readLock = readWriteLock.readLock();
228                            _writeLock = readWriteLock.writeLock();
229    
230                            _localClusterNodeAddress = getSerializedString(
231                                    ClusterExecutorUtil.getLocalClusterNodeAddress());
232    
233                            _clusterEventListener = new MemorySchedulerClusterEventListener();
234    
235                            ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
236    
237                            String masterAddressString = getMasterAddressString(false);
238    
239                            if (!_localClusterNodeAddress.equals(masterAddressString)) {
240                                    List<SchedulerResponse> schedulerResponses = callMaster(
241                                            masterAddressString, _getScheduledJobsMethodKey3,
242                                            StorageType.MEMORY_CLUSTERED);
243    
244                                    initMemoryClusteredJobs(schedulerResponses);
245                            }
246                    }
247                    catch (Exception e) {
248                            throw new SchedulerException("Unable to initialize scheduler", e);
249                    }
250            }
251    
252            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
253            @Override
254            public void pause(String groupName) throws SchedulerException {
255                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
256    
257                    _readLock.lock();
258    
259                    try {
260                            if (memoryClusteredSlaveJob) {
261                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
262                            }
263                            else {
264                                    _schedulerEngine.pause(groupName);
265                            }
266                    }
267                    finally {
268                            _readLock.unlock();
269                    }
270    
271                    setClusterableThreadLocal(groupName);
272            }
273    
274            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
275            @Override
276            public void pause(String jobName, String groupName)
277                    throws SchedulerException {
278    
279                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
280    
281                    _readLock.lock();
282    
283                    try {
284                            if (memoryClusteredSlaveJob) {
285                                    updateMemoryClusteredJob(
286                                            jobName, groupName, TriggerState.PAUSED);
287                            }
288                            else {
289                                    _schedulerEngine.pause(jobName, groupName);
290                            }
291                    }
292                    finally {
293                            _readLock.unlock();
294                    }
295    
296                    setClusterableThreadLocal(groupName);
297            }
298    
299            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
300            @Override
301            public void resume(String groupName) throws SchedulerException {
302                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
303    
304                    _readLock.lock();
305    
306                    try {
307                            if (memoryClusteredSlaveJob) {
308                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
309                            }
310                            else {
311                                    _schedulerEngine.resume(groupName);
312                            }
313                    }
314                    finally {
315                            _readLock.unlock();
316                    }
317    
318                    setClusterableThreadLocal(groupName);
319            }
320    
321            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
322            @Override
323            public void resume(String jobName, String groupName)
324                    throws SchedulerException {
325    
326                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
327    
328                    _readLock.lock();
329    
330                    try {
331                            if (memoryClusteredSlaveJob) {
332                                    updateMemoryClusteredJob(
333                                            jobName, groupName, TriggerState.NORMAL);
334                            }
335                            else {
336                                    _schedulerEngine.resume(jobName, groupName);
337                            }
338                    }
339                    finally {
340                            _readLock.unlock();
341                    }
342    
343                    setClusterableThreadLocal(groupName);
344            }
345    
346            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
347            @Override
348            public void schedule(
349                            Trigger trigger, String description, String destinationName,
350                            Message message)
351                    throws SchedulerException {
352    
353                    String groupName = trigger.getGroupName();
354                    String jobName = trigger.getJobName();
355    
356                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
357    
358                    _readLock.lock();
359    
360                    try {
361                            if (memoryClusteredSlaveJob) {
362                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
363    
364                                    schedulerResponse.setDescription(description);
365                                    schedulerResponse.setDestinationName(destinationName);
366                                    schedulerResponse.setGroupName(groupName);
367                                    schedulerResponse.setJobName(jobName);
368                                    schedulerResponse.setMessage(message);
369                                    schedulerResponse.setTrigger(trigger);
370    
371                                    _memoryClusteredJobs.put(
372                                            getFullName(jobName, groupName),
373                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
374                                                    schedulerResponse, TriggerState.NORMAL));
375                            }
376                            else {
377                                    _schedulerEngine.schedule(
378                                            trigger, description, destinationName, message);
379                            }
380                    }
381                    finally {
382                            _readLock.unlock();
383                    }
384    
385                    setClusterableThreadLocal(groupName);
386            }
387    
388            @Override
389            public void setBeanIdentifier(String beanIdentifier) {
390                    _beanIdentifier = beanIdentifier;
391            }
392    
393            @Override
394            public void shutdown() throws SchedulerException {
395                    _portalReady = false;
396    
397                    try {
398                            ClusterExecutorUtil.removeClusterEventListener(
399                                    _clusterEventListener);
400    
401                            LockLocalServiceUtil.unlock(
402                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
403                                    PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
404                    }
405                    catch (Exception e) {
406                            throw new SchedulerException("Unable to shutdown scheduler", e);
407                    }
408    
409                    _schedulerEngine.shutdown();
410            }
411    
412            @Override
413            public void start() throws SchedulerException {
414                    _schedulerEngine.start();
415    
416                    _portalReady = true;
417            }
418    
419            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
420            @Override
421            public void suppressError(String jobName, String groupName)
422                    throws SchedulerException {
423    
424                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
425    
426                    if (!memoryClusteredSlaveJob) {
427                            _readLock.lock();
428    
429                            try {
430                                    _schedulerEngine.suppressError(jobName, groupName);
431                            }
432                            finally {
433                                    _readLock.unlock();
434                            }
435                    }
436    
437                    setClusterableThreadLocal(groupName);
438            }
439    
440            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
441            @Override
442            public void unschedule(String groupName) throws SchedulerException {
443                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
444    
445                    _readLock.lock();
446    
447                    try {
448                            if (memoryClusteredSlaveJob) {
449                                    removeMemoryClusteredJobs(groupName);
450                            }
451                            else {
452                                    _schedulerEngine.unschedule(groupName);
453                            }
454                    }
455                    finally {
456                            _readLock.unlock();
457                    }
458    
459                    setClusterableThreadLocal(groupName);
460            }
461    
462            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
463            @Override
464            public void unschedule(String jobName, String groupName)
465                    throws SchedulerException {
466    
467                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
468    
469                    _readLock.lock();
470    
471                    try {
472                            if (memoryClusteredSlaveJob) {
473                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
474                            }
475                            else {
476                                    _schedulerEngine.unschedule(jobName, groupName);
477                            }
478                    }
479                    finally {
480                            _readLock.unlock();
481                    }
482    
483                    setClusterableThreadLocal(groupName);
484            }
485    
486            @Clusterable(acceptor = SchedulerClusterInvokeAcceptor.class)
487            @Override
488            public void update(Trigger trigger) throws SchedulerException {
489                    String jobName = trigger.getJobName();
490                    String groupName = trigger.getGroupName();
491    
492                    boolean memoryClusteredSlaveJob = isMemoryClusteredSlaveJob(groupName);
493    
494                    _readLock.lock();
495    
496                    try {
497                            if (memoryClusteredSlaveJob) {
498                                    boolean updated = false;
499    
500                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
501                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
502    
503                                            SchedulerResponse schedulerResponse =
504                                                    memoryClusteredJob.getKey();
505    
506                                            if (jobName.equals(schedulerResponse.getJobName()) &&
507                                                    groupName.equals(schedulerResponse.getGroupName())) {
508    
509                                                    schedulerResponse.setTrigger(trigger);
510    
511                                                    updated = true;
512    
513                                                    break;
514                                            }
515                                    }
516    
517                                    if (!updated) {
518                                            throw new SchedulerException(
519                                                    "Unable to update trigger for memory clustered job");
520                                    }
521                            }
522                            else {
523                                    _schedulerEngine.update(trigger);
524                            }
525                    }
526                    finally {
527                            _readLock.unlock();
528                    }
529    
530                    setClusterableThreadLocal(groupName);
531            }
532    
533            @Override
534            public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
535                    getMasterAddressString(false);
536    
537                    return null;
538            }
539    
540            protected <T> T callMaster(
541                            String masterAddressString, MethodKey methodKey,
542                            Object... arguments)
543                    throws SchedulerException {
544    
545                    MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
546    
547                    Address address = (Address)getDeserializedObject(masterAddressString);
548    
549                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
550                            methodHandler, address);
551    
552                    try {
553                            FutureClusterResponses futureClusterResponses =
554                                    ClusterExecutorUtil.execute(clusterRequest);
555    
556                            ClusterNodeResponses clusterNodeResponses =
557                                    futureClusterResponses.get(20, TimeUnit.SECONDS);
558    
559                            ClusterNodeResponse clusterNodeResponse =
560                                    clusterNodeResponses.getClusterResponse(address);
561    
562                            return (T)clusterNodeResponse.getResult();
563                    }
564                    catch (Exception e) {
565                            throw new SchedulerException(
566                                    "Unable to load scheduled jobs from cluster node " +
567                                            address.getDescription(),
568                                    e);
569                    }
570            }
571    
572            protected Object getDeserializedObject(String string)
573                    throws SchedulerException {
574    
575                    byte[] bytes = Base64.decode(string);
576    
577                    UnsyncByteArrayInputStream byteArrayInputStream =
578                            new UnsyncByteArrayInputStream(bytes);
579    
580                    ObjectInputStream objectInputStream = null;
581    
582                    try {
583                            objectInputStream = new ObjectInputStream(byteArrayInputStream);
584    
585                            Object object = objectInputStream.readObject();
586    
587                            return object;
588                    }
589                    catch (Exception e) {
590                            throw new SchedulerException(
591                                    "Unable to deserialize object from " + string, e);
592                    }
593                    finally {
594                            try {
595                                    objectInputStream.close();
596                            }
597                            catch (Exception e) {
598                            }
599                    }
600            }
601    
602            protected String getFullName(String jobName, String groupName) {
603                    return groupName.concat(StringPool.PERIOD).concat(jobName);
604            }
605    
606            protected String getMasterAddressString(boolean asynchronous)
607                    throws SchedulerException {
608    
609                    String owner = null;
610    
611                    Lock lock = null;
612    
613                    while (true) {
614                            try {
615                                    if (owner == null) {
616                                            lock = LockLocalServiceUtil.lock(
617                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
618                                                    _localClusterNodeAddress,
619                                                    PropsValues.
620                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
621                                    }
622                                    else {
623                                            lock = LockLocalServiceUtil.lock(
624                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
625                                                    _localClusterNodeAddress,
626                                                    PropsValues.
627                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
628                                    }
629    
630                                    Address address = (Address)getDeserializedObject(
631                                            lock.getOwner());
632    
633                                    if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
634                                            break;
635                                    }
636                                    else {
637                                            owner = lock.getOwner();
638                                    }
639                            }
640                            catch (Exception e) {
641                                    if (_log.isWarnEnabled()) {
642                                            _log.warn(
643                                                    "Unable to obtain memory scheduler cluster lock. " +
644                                                            "Trying again.");
645                                    }
646                            }
647                    }
648    
649                    boolean master = _localClusterNodeAddress.equals(lock.getOwner());
650    
651                    if (master == _master) {
652                            return lock.getOwner();
653                    }
654    
655                    if (master) {
656                            slaveToMaster();
657                    }
658                    else {
659                            masterToSlave(lock.getOwner(), asynchronous);
660                    }
661    
662                    return lock.getOwner();
663            }
664    
665            protected String getSerializedString(Object object) throws Exception {
666                    UnsyncByteArrayOutputStream byteArrayOutputStream =
667                            new UnsyncByteArrayOutputStream();
668    
669                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
670                            byteArrayOutputStream);
671    
672                    objectOutputStream.writeObject(object);
673                    objectOutputStream.close();
674    
675                    byte[] bytes = byteArrayOutputStream.toByteArray();
676    
677                    return Base64.encode(bytes);
678            }
679    
680            protected void initMemoryClusteredJobs(
681                            List<SchedulerResponse> schedulerResponses)
682                    throws Exception {
683    
684                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
685                            Trigger oldTrigger = schedulerResponse.getTrigger();
686    
687                            String jobName = schedulerResponse.getJobName();
688                            String groupName = SchedulerEngineHelperUtil.namespaceGroupName(
689                                    schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
690    
691                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
692                                    oldTrigger.getTriggerType(), jobName, groupName,
693                                    oldTrigger.getStartDate(), oldTrigger.getEndDate(),
694                                    oldTrigger.getTriggerContent());
695    
696                            schedulerResponse.setTrigger(newTrigger);
697    
698                            TriggerState triggerState = SchedulerEngineHelperUtil.getJobState(
699                                    schedulerResponse);
700    
701                            Message message = schedulerResponse.getMessage();
702    
703                            message.remove(JOB_STATE);
704    
705                            _memoryClusteredJobs.put(
706                                    getFullName(jobName, groupName),
707                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
708                                            schedulerResponse, triggerState));
709                    }
710            }
711    
712            protected boolean isMemoryClusteredSlaveJob(String groupName)
713                    throws SchedulerException {
714    
715                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
716                            groupName);
717    
718                    StorageType storageType = objectValuePair.getValue();
719    
720                    if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
721                            return false;
722                    }
723    
724                    String masterAddressString = getMasterAddressString(false);
725    
726                    if (_localClusterNodeAddress.equals(masterAddressString)) {
727                            return false;
728                    }
729    
730                    return true;
731            }
732    
733            protected void masterToSlave(
734                            String masterAddressString, boolean asynchronous)
735                    throws SchedulerException {
736    
737                    if (asynchronous) {
738                            MethodHandler methodHandler = new MethodHandler(
739                                    _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
740    
741                            Address address = (Address)getDeserializedObject(
742                                    masterAddressString);
743    
744                            ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
745                                    methodHandler, address);
746    
747                            try {
748                                    ClusterExecutorUtil.execute(
749                                            clusterRequest,
750                                            new MemorySchedulerClusterResponseCallback(address), 20,
751                                            TimeUnit.SECONDS);
752    
753                                    return;
754                            }
755                            catch (Exception e) {
756                                    throw new SchedulerException(
757                                            "Unable to load scheduled jobs from cluster node " +
758                                                    address.getDescription(),
759                                            e);
760                            }
761                    }
762    
763                    List<SchedulerResponse> schedulerResponses = callMaster(
764                            masterAddressString, _getScheduledJobsMethodKey3,
765                            StorageType.MEMORY_CLUSTERED);
766    
767                    _doMasterToSlave(schedulerResponses);
768            }
769    
770            protected void removeMemoryClusteredJobs(String groupName) {
771                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
772                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
773    
774                    Iterator
775                            <Map.Entry<String,
776                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
777                                            memoryClusteredJobs.iterator();
778    
779                    while (itr.hasNext()) {
780                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
781                                    entry = itr.next();
782    
783                            ObjectValuePair<SchedulerResponse, TriggerState>
784                                    memoryClusteredJob = entry.getValue();
785    
786                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
787    
788                            if (groupName.equals(schedulerResponse.getGroupName())) {
789                                    itr.remove();
790                            }
791                    }
792            }
793    
794            protected ObjectValuePair<String, StorageType> resolveGroupName(
795                    String groupName) {
796    
797                    int index = groupName.indexOf(CharPool.POUND);
798    
799                    String storageTypeString = groupName.substring(0, index);
800    
801                    StorageType storageType = StorageType.valueOf(storageTypeString);
802    
803                    String orginalGroupName = groupName.substring(index + 1);
804    
805                    return new ObjectValuePair<String, StorageType>(
806                            orginalGroupName, storageType);
807            }
808    
809            protected void setClusterableThreadLocal(String groupName) {
810                    ObjectValuePair<String, StorageType> objectValuePair = resolveGroupName(
811                            groupName);
812    
813                    ClusterableContextThreadLocal.putThreadLocalContext(
814                            STORAGE_TYPE, objectValuePair.getValue());
815                    ClusterableContextThreadLocal.putThreadLocalContext(
816                            _PORTAL_READY, _portalReady);
817    
818                    boolean pluginReady = true;
819    
820                    if (PluginContextLifecycleThreadLocal.isInitializing() ||
821                            PluginContextLifecycleThreadLocal.isDestroying()) {
822    
823                            pluginReady = false;
824                    }
825    
826                    ClusterableContextThreadLocal.putThreadLocalContext(
827                            _PLUGIN_READY, pluginReady);
828            }
829    
830            protected void slaveToMaster() throws SchedulerException {
831                    boolean forceSync = ProxyModeThreadLocal.isForceSync();
832    
833                    ProxyModeThreadLocal.setForceSync(true);
834    
835                    _writeLock.lock();
836    
837                    try {
838                            for (ObjectValuePair<SchedulerResponse, TriggerState>
839                                            memoryClusteredJob : _memoryClusteredJobs.values()) {
840    
841                                    SchedulerResponse schedulerResponse =
842                                            memoryClusteredJob.getKey();
843    
844                                    _schedulerEngine.schedule(
845                                            schedulerResponse.getTrigger(),
846                                            schedulerResponse.getDescription(),
847                                            schedulerResponse.getDestinationName(),
848                                            schedulerResponse.getMessage());
849    
850                                    TriggerState triggerState = memoryClusteredJob.getValue();
851    
852                                    if (triggerState.equals(TriggerState.PAUSED)) {
853                                            _schedulerEngine.pause(
854                                                    schedulerResponse.getJobName(),
855                                                    schedulerResponse.getGroupName());
856                                    }
857                            }
858    
859                            _memoryClusteredJobs.clear();
860                    }
861                    finally {
862                            ProxyModeThreadLocal.setForceSync(forceSync);
863    
864                            _master = true;
865    
866                            _writeLock.unlock();
867                    }
868            }
869    
870            protected void updateMemoryClusteredJob(
871                    String jobName, String groupName, TriggerState triggerState) {
872    
873                    ObjectValuePair<SchedulerResponse, TriggerState>
874                            memoryClusteredJob = _memoryClusteredJobs.get(
875                                    getFullName(jobName, groupName));
876    
877                    if (memoryClusteredJob != null) {
878                            memoryClusteredJob.setValue(triggerState);
879                    }
880            }
881    
882            protected void updateMemoryClusteredJobs(
883                    String groupName, TriggerState triggerState) {
884    
885                    for (ObjectValuePair<SchedulerResponse, TriggerState>
886                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
887    
888                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
889    
890                            if (groupName.equals(schedulerResponse.getGroupName())) {
891                                    memoryClusteredJob.setValue(triggerState);
892                            }
893                    }
894            }
895    
896            @BeanReference(
897                    name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
898            protected SchedulerEngine schedulerEngine;
899    
900            private void _doMasterToSlave(List<SchedulerResponse> schedulerResponses)
901                    throws SchedulerException {
902    
903                    _writeLock.lock();
904    
905                    try {
906                            for (SchedulerResponse schedulerResponse :
907                                            _schedulerEngine.getScheduledJobs()) {
908    
909                                    if (StorageType.MEMORY_CLUSTERED ==
910                                                    schedulerResponse.getStorageType()) {
911    
912                                            String groupName = StorageType.MEMORY_CLUSTERED.toString();
913    
914                                            groupName = groupName.concat(StringPool.POUND).concat(
915                                                    schedulerResponse.getGroupName());
916    
917                                            _schedulerEngine.delete(
918                                                    schedulerResponse.getJobName(), groupName);
919                                    }
920                            }
921    
922                            initMemoryClusteredJobs(schedulerResponses);
923    
924                            if (_log.isInfoEnabled()) {
925                                    _log.info("Switched current node from master to slave");
926                            }
927                    }
928                    catch (Exception e) {
929                            throw new SchedulerException(e);
930                    }
931                    finally {
932                            _master = false;
933    
934                            _writeLock.unlock();
935                    }
936            }
937    
938            private static final String _LOCK_CLASS_NAME =
939                    SchedulerEngine.class.getName();
940    
941            private static final String _PLUGIN_READY = "plugin.ready";
942    
943            private static final String _PORTAL_READY = "portal.ready";
944    
945            private static Log _log = LogFactoryUtil.getLog(
946                    ClusterSchedulerEngine.class);
947    
948            private static MethodKey _getScheduledJobMethodKey = new MethodKey(
949                    SchedulerEngineHelperUtil.class.getName(), "getScheduledJob",
950                    String.class, String.class, StorageType.class);
951            private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
952                    SchedulerEngineHelperUtil.class.getName(), "getScheduledJobs");
953            private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
954                    SchedulerEngineHelperUtil.class.getName(), "getScheduledJobs",
955                    String.class, StorageType.class);
956            private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
957                    SchedulerEngineHelperUtil.class.getName(), "getScheduledJobs",
958                    StorageType.class);
959    
960            private String _beanIdentifier;
961            private ClusterEventListener _clusterEventListener;
962            private volatile String _localClusterNodeAddress;
963            private volatile boolean _master;
964            private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
965                    _memoryClusteredJobs = new ConcurrentHashMap
966                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
967            private boolean _portalReady;
968            private java.util.concurrent.locks.Lock _readLock;
969            private SchedulerEngine _schedulerEngine;
970            private java.util.concurrent.locks.Lock _writeLock;
971    
972            private static class SchedulerClusterInvokeAcceptor
973                    implements ClusterInvokeAcceptor {
974    
975                    @Override
976                    public boolean accept(Map<String, Serializable> context) {
977                            if (ClusterInvokeThreadLocal.isEnabled()) {
978                                    return true;
979                            }
980    
981                            StorageType storageType = (StorageType)context.get(STORAGE_TYPE);
982                            boolean portalReady = (Boolean)context.get(_PORTAL_READY);
983                            boolean pluginReady = (Boolean)context.get(_PLUGIN_READY);
984    
985                            if (storageType.equals(StorageType.PERSISTED) || !portalReady ||
986                                    !pluginReady) {
987    
988                                    return false;
989                            }
990    
991                            return true;
992                    }
993    
994            }
995    
996            private class MemorySchedulerClusterEventListener
997                    implements ClusterEventListener {
998    
999                    @Override
1000                    public void processClusterEvent(ClusterEvent clusterEvent) {
1001                            try {
1002                                    getMasterAddressString(true);
1003                            }
1004                            catch (Exception e) {
1005                                    _log.error("Unable to update memory scheduler cluster lock", e);
1006                            }
1007                    }
1008    
1009            }
1010    
1011            private class MemorySchedulerClusterResponseCallback
1012                    extends BaseClusterResponseCallback {
1013    
1014                    public MemorySchedulerClusterResponseCallback(Address address) {
1015                            _address = address;
1016                    }
1017    
1018                    @Override
1019                    public void callback(ClusterNodeResponses clusterNodeResponses) {
1020                            try {
1021                                    ClusterNodeResponse clusterNodeResponse =
1022                                            clusterNodeResponses.getClusterResponse(_address);
1023    
1024                                    List<SchedulerResponse> schedulerResponses =
1025                                            (List<SchedulerResponse>)clusterNodeResponse.getResult();
1026    
1027                                    _doMasterToSlave(schedulerResponses);
1028                            }
1029                            catch (Exception e) {
1030                                    _log.error(
1031                                            "Unable to load memory clustered jobs from cluster node " +
1032                                                    _address.getDescription(),
1033                                            e);
1034                            }
1035                    }
1036    
1037                    @Override
1038                    public void processTimeoutException(TimeoutException timeoutException) {
1039                            _log.error(
1040                                    "Unable to load memory clustered jobs from cluster node " +
1041                                            _address.getDescription(),
1042                                    timeoutException);
1043                    }
1044    
1045                    private Address _address;
1046    
1047            }
1048    
1049    }