001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.Processor;
021    
022    /**
023     * A <a href="http://camel.apache.org/throttler.html">Throttler</a>
024     * will set a limit on the maximum number of message exchanges which can be sent
025     * to a processor within a specific time period. <p/> This pattern can be
026     * extremely useful if you have some external system which meters access; such
027     * as only allowing 100 requests per second; or if huge load can cause a
028     * particular systme to malfunction or to reduce its throughput you might want
029     * to introduce some throttling.
030     * 
031     * @version $Revision: 16365 $
032     */
033    public class Throttler extends DelayProcessorSupport implements Traceable {
034        private long maximumRequestsPerPeriod;
035        private long timePeriodMillis;
036        private TimeSlot slot;
037    
038        public Throttler(Processor processor, long maximumRequestsPerPeriod) {
039            this(processor, maximumRequestsPerPeriod, 1000);
040        }
041    
042        public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis) {
043            super(processor);
044            this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
045            this.timePeriodMillis = timePeriodMillis;
046        }
047    
048        @Override
049        public String toString() {
050            return "Throttler[requests: " + maximumRequestsPerPeriod + " per: " + timePeriodMillis + " (ms) to: "
051                   + getProcessor() + "]";
052        }
053    
054        public String getTraceLabel() {
055            return "throttle[" + maximumRequestsPerPeriod + " per: " + timePeriodMillis + "]";
056        }
057    
058        // Properties
059        // -----------------------------------------------------------------------
060        public long getMaximumRequestsPerPeriod() {
061            return maximumRequestsPerPeriod;
062        }
063    
064        /**
065         * Sets the maximum number of requests per time period
066         */
067        public void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod) {
068            this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
069        }
070    
071        public long getTimePeriodMillis() {
072            return timePeriodMillis;
073        }
074    
075        /**
076         * Sets the time period during which the maximum number of requests apply
077         */
078        public void setTimePeriodMillis(long timePeriodMillis) {
079            this.timePeriodMillis = timePeriodMillis;
080        }
081    
082        // Implementation methods
083        // -----------------------------------------------------------------------
084        protected void delay(Exchange exchange) throws Exception {
085            TimeSlot slot = nextSlot();
086            if (!slot.isActive()) {
087                waitUntil(slot.startTime, exchange);
088            }
089        }
090        
091        /*
092         * Determine what the next available time slot is for handling an Exchange
093         */
094        protected synchronized TimeSlot nextSlot() {
095            if (slot == null) {
096                slot = new TimeSlot();
097            }
098            if (slot.isFull()) {
099                slot = slot.next();
100            }
101            slot.assign();
102            return slot;
103        }
104    
105        /*
106        * A time slot is capable of handling a number of exchanges within a certain period of time.
107        */
108        protected class TimeSlot {
109            
110            private long capacity = Throttler.this.maximumRequestsPerPeriod;
111            private final long duration = Throttler.this.timePeriodMillis;
112            private final long startTime;
113    
114            protected TimeSlot() {
115                this(System.currentTimeMillis());
116            }
117    
118            protected TimeSlot(long startTime) {
119                this.startTime = startTime;
120            }
121            
122            protected void assign() {
123                capacity--;
124            }
125            
126            /*
127             * Start the next time slot either now or in the future
128             * (no time slots are being created in the past)
129             */
130            protected TimeSlot next() {
131                return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration));
132            }
133            
134            protected boolean isActive() {
135                return startTime <= System.currentTimeMillis();
136            }
137            
138            protected boolean isFull() {
139                return capacity <= 0;
140            }        
141        }
142    }