001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.thread;
018    
019    /**
020     * A Valve is a synchronization object used enable or disable the "flow" of
021     * concurrent processing.
022     * 
023     * @version $Revision: 1.2 $
024     */
025    public final class Valve {
026    
027        private final Object mutex = new Object();
028        private boolean on;
029        private int turningOff;
030        private int usage;
031    
032        public Valve(boolean on) {
033            this.on = on;
034        }
035    
036        /**
037         * Turns the valve on. This method blocks until the valve is off.
038         * 
039         * @throws InterruptedException if wait is interrupted
040         */
041        public void turnOn() throws InterruptedException {
042            synchronized (mutex) {
043                while (on) {
044                    mutex.wait();
045                }
046                on = true;
047                mutex.notifyAll();
048            }
049        }
050    
051        public boolean isOn() {
052            synchronized (mutex) {
053                return on;
054            }
055        }
056    
057        /**
058         * Turns the valve off. This method blocks until the valve is on and the
059         * valve is not in use.
060         * 
061         * @throws InterruptedException if wait is interrupted
062         */
063        public void turnOff() throws InterruptedException {
064            synchronized (mutex) {
065                if (turningOff < 0) {
066                    throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
067                }
068                try {
069                    ++turningOff;
070                    while (usage > 0 || !on) {
071                        mutex.wait();
072                    }
073                    on = false;
074                    mutex.notifyAll();
075                } finally {
076                    --turningOff;
077                }
078            }
079        }
080    
081        /**
082         * Increments the use counter of the valve. This method blocks if the valve
083         * is off, or is being turned off.
084         * 
085         * @throws InterruptedException  if wait is interrupted
086         */
087        public void increment() throws InterruptedException {
088            synchronized (mutex) {
089                if (turningOff < 0) {
090                    throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
091                }
092                if (usage < 0) {
093                    throw new IllegalStateException("Unbalanced usage: " + usage);
094                }
095                // Do we have to wait for the value to be on?
096                while (turningOff > 0 || !on) {
097                    mutex.wait();
098                }
099                usage++;
100            }
101        }
102    
103        /**
104         * Decrements the use counter of the valve.
105         */
106        public void decrement() {
107            synchronized (mutex) {
108                usage--;
109                if (turningOff < 0) {
110                    throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
111                }
112                if (usage < 0) {
113                    throw new IllegalStateException("Unbalanced usage: " + usage);
114                }
115                if (turningOff > 0 && usage < 1) {
116                    mutex.notifyAll();
117                }
118            }
119        }
120    
121    }