001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.cluster;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    
022    import org.apache.activemq.broker.Broker;
023    import org.apache.activemq.broker.BrokerFilter;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.broker.region.Subscription;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.ConsumerId;
028    import org.apache.activemq.command.ConsumerInfo;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * Monitors for client connections that may fail to another broker - but this
034     * broker isn't aware they've gone. Can occur with network glitches or client
035     * error
036     * 
037     * @version $Revision$
038     */
039    public class ConnectionSplitBroker extends BrokerFilter{
040        private static final Log LOG = LogFactory.getLog(ConnectionSplitBroker.class);
041        private List<ConsumerInfo>networkConsumerList = new ArrayList<ConsumerInfo>();
042        public ConnectionSplitBroker(Broker next) {
043            super(next);
044        }
045    
046            
047        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info)
048                throws Exception {
049            ActiveMQDestination dest = info.getDestination();
050    
051            synchronized (networkConsumerList) {
052                if (info.isNetworkSubscription()) {
053                    networkConsumerList.add(info);
054                } else {
055                    if (!networkConsumerList.isEmpty()) {
056                        List<ConsumerInfo> gcList = new ArrayList<ConsumerInfo>();
057                        for (ConsumerInfo nc : networkConsumerList) {
058                            if (!nc.isNetworkConsumersEmpty()) {
059                                
060                                for (ConsumerId id : nc.getNetworkConsumerIds()) {
061                                    
062                                    if (id.equals(info.getConsumerId())) {
063                                        nc.removeNetworkConsumerId(id);
064                                        if (nc.isNetworkConsumersEmpty()) {
065                                            gcList.add(nc);
066                                        }
067                                    }
068                                }
069                            }
070                        }
071                        for (ConsumerInfo nc : gcList) {
072                            networkConsumerList.remove(nc);
073                            super.removeConsumer(context, nc);
074                            LOG.warn("Removed stale network consumer " + nc);
075                        }
076                    }
077                }
078            }
079    
080            return super.addConsumer(context, info);
081        }
082    
083        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
084            if (info.isNetworkSubscription()) {
085    
086                synchronized (networkConsumerList) {
087                    networkConsumerList.remove(info);
088                }
089            }
090            super.removeConsumer(context, info);
091        }
092    }