001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.bam.processor;
018    
019    import java.util.Date;
020    import java.util.HashMap;
021    import java.util.List;
022    import java.util.Map;
023    import javax.persistence.EntityManager;
024    import javax.persistence.LockModeType;
025    import javax.persistence.PersistenceException;
026    
027    import org.apache.camel.bam.QueryUtils;
028    import org.apache.camel.bam.model.ActivityState;
029    import org.apache.camel.bam.rules.ProcessRules;
030    import org.apache.camel.impl.ServiceSupport;
031    import org.apache.camel.util.CastUtils;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    import org.springframework.orm.jpa.JpaCallback;
035    import org.springframework.orm.jpa.JpaTemplate;
036    import org.springframework.transaction.TransactionStatus;
037    import org.springframework.transaction.support.TransactionCallbackWithoutResult;
038    import org.springframework.transaction.support.TransactionTemplate;
039    import org.springframework.util.ClassUtils;
040    
041    /**
042     * A timer engine to monitor for expired activities and perform whatever actions
043     * are required.
044     * 
045     * @version $Revision: 17033 $
046     */
047    public class ActivityMonitorEngine extends ServiceSupport implements Runnable {
048        private static final Log LOG = LogFactory.getLog(ActivityMonitorEngine.class);
049        private JpaTemplate template;
050        private TransactionTemplate transactionTemplate;
051        private ProcessRules rules;
052        private long windowMillis = 1000L;
053        private Thread thread;
054        private boolean useLocking;
055    
056        public ActivityMonitorEngine(JpaTemplate template, TransactionTemplate transactionTemplate, ProcessRules rules) {
057            this.template = template;
058            this.transactionTemplate = transactionTemplate;
059            this.rules = rules;
060        }
061    
062        public boolean isUseLocking() {
063            return useLocking;
064        }
065    
066        public void setUseLocking(boolean useLocking) {
067            this.useLocking = useLocking;
068        }
069    
070        public void run() {
071            LOG.debug("Starting to poll for timeout events");
072    
073            while (!isStopped()) {
074                try {
075                    long now = System.currentTimeMillis();
076                    long nextPoll = now + windowMillis;
077                    final Date timeNow = new Date(now);
078    
079                    transactionTemplate.execute(new TransactionCallbackWithoutResult() {
080                        protected void doInTransactionWithoutResult(TransactionStatus status) {
081                            Map<String, Object> params = new HashMap<String, Object>(1);
082                            params.put("timeNow", timeNow);
083    
084                            List<ActivityState> list = CastUtils.cast(template.findByNamedParams("select x from "
085                                    + QueryUtils.getTypeName(ActivityState.class) + " x where x.timeOverdue < :timeNow", params));
086                            for (ActivityState activityState : list) {
087                                fireExpiredEvent(activityState);
088                            }
089                        }
090                    });
091    
092                    long timeToSleep = nextPoll - System.currentTimeMillis();
093                    if (timeToSleep > 0) {
094                        if (LOG.isDebugEnabled()) {
095                            LOG.debug("Sleeping for " + timeToSleep + " millis");
096                        }
097                        try {
098                            Thread.sleep(timeToSleep);
099                        } catch (InterruptedException e) {
100                            LOG.debug("Caught: " + e, e);
101                        }
102                    }
103                } catch (Exception e) {
104                    LOG.error("Caught: " + e, e);
105                }
106            }
107        }
108    
109        protected void fireExpiredEvent(final ActivityState activityState) {
110            if (LOG.isDebugEnabled()) {
111                LOG.debug("Trying to fire expiration of: " + activityState);
112            }
113    
114            template.execute(new JpaCallback() {
115                public Object doInJpa(EntityManager entityManager) throws PersistenceException {
116                    // lets try lock the object first
117                    if (isUseLocking()) {
118                        LOG.info("Attempting to lock: " + activityState);
119                        entityManager.lock(activityState, LockModeType.WRITE);
120                        LOG.info("Grabbed lock: " + activityState);
121                    }
122    
123                    try {
124                        rules.processExpired(activityState);
125                    } catch (Exception e) {
126                        LOG.error("Failed to process expiration of: " + activityState + ". Reason: " + e, e);
127                    }
128                    activityState.setTimeOverdue(null);
129                    //activityState.setEscalationLevel(escalateLevel + 1);
130                    return null;
131                }
132            });
133        }
134    
135        protected void doStart() throws Exception {
136            rules.start();
137            thread = new Thread(this, "ActivityMonitorEngine");
138            thread.start();
139        }
140    
141        protected void doStop() throws Exception {
142            if (thread != null) {
143                thread = null;
144            }
145            rules.stop();
146        }
147    }