001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.net.URI;
022    import java.net.URISyntaxException;
023    import java.net.UnknownHostException;
024    import java.util.ArrayList;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.concurrent.CopyOnWriteArrayList;
031    import java.util.concurrent.CountDownLatch;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicBoolean;
034    
035    import javax.management.MBeanServer;
036    import javax.management.MalformedObjectNameException;
037    import javax.management.ObjectName;
038    
039    import org.apache.activemq.ActiveMQConnectionMetaData;
040    import org.apache.activemq.Service;
041    import org.apache.activemq.advisory.AdvisoryBroker;
042    import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
043    import org.apache.activemq.broker.ft.MasterConnector;
044    import org.apache.activemq.broker.jmx.BrokerView;
045    import org.apache.activemq.broker.jmx.ConnectorView;
046    import org.apache.activemq.broker.jmx.ConnectorViewMBean;
047    import org.apache.activemq.broker.jmx.FTConnectorView;
048    import org.apache.activemq.broker.jmx.JmsConnectorView;
049    import org.apache.activemq.broker.jmx.ManagedRegionBroker;
050    import org.apache.activemq.broker.jmx.ManagementContext;
051    import org.apache.activemq.broker.jmx.NetworkConnectorView;
052    import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
053    import org.apache.activemq.broker.jmx.ProxyConnectorView;
054    import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
055    import org.apache.activemq.broker.region.Destination;
056    import org.apache.activemq.broker.region.DestinationFactory;
057    import org.apache.activemq.broker.region.DestinationFactoryImpl;
058    import org.apache.activemq.broker.region.DestinationInterceptor;
059    import org.apache.activemq.broker.region.RegionBroker;
060    import org.apache.activemq.broker.region.policy.PolicyMap;
061    import org.apache.activemq.broker.region.virtual.MirroredQueue;
062    import org.apache.activemq.broker.region.virtual.VirtualDestination;
063    import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
064    import org.apache.activemq.broker.region.virtual.VirtualTopic;
065    import org.apache.activemq.command.ActiveMQDestination;
066    import org.apache.activemq.command.BrokerId;
067    import org.apache.activemq.kaha.Store;
068    import org.apache.activemq.kaha.StoreFactory;
069    import org.apache.activemq.network.ConnectionFilter;
070    import org.apache.activemq.network.DiscoveryNetworkConnector;
071    import org.apache.activemq.network.NetworkConnector;
072    import org.apache.activemq.network.jms.JmsConnector;
073    import org.apache.activemq.proxy.ProxyConnector;
074    import org.apache.activemq.security.MessageAuthorizationPolicy;
075    import org.apache.activemq.security.SecurityContext;
076    import org.apache.activemq.selector.SelectorParser;
077    import org.apache.activemq.store.PersistenceAdapter;
078    import org.apache.activemq.store.PersistenceAdapterFactory;
079    import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
080    import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
081    import org.apache.activemq.thread.TaskRunnerFactory;
082    import org.apache.activemq.transport.TransportFactory;
083    import org.apache.activemq.transport.TransportServer;
084    import org.apache.activemq.transport.vm.VMTransportFactory;
085    import org.apache.activemq.usage.SystemUsage;
086    import org.apache.activemq.util.IOExceptionSupport;
087    import org.apache.activemq.util.IOHelper;
088    import org.apache.activemq.util.JMXSupport;
089    import org.apache.activemq.util.ServiceStopper;
090    import org.apache.activemq.util.URISupport;
091    import org.apache.commons.logging.Log;
092    import org.apache.commons.logging.LogFactory;
093    
094    /**
095     * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
096     * number of transport connectors, network connectors and a bunch of properties
097     * which can be used to configure the broker as its lazily created.
098     * 
099     * @version $Revision: 1.1 $
100     */
101    public class BrokerService implements Service {
102            protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
103        public static final String DEFAULT_PORT = "61616";
104        public static final String LOCAL_HOST_NAME;
105        public static final String DEFAULT_BROKER_NAME = "localhost";
106    
107        private static final Log LOG = LogFactory.getLog(BrokerService.class);
108        private static final long serialVersionUID = 7353129142305630237L;
109    
110        private boolean useJmx = true;
111        private boolean enableStatistics = true;
112        private boolean persistent = true;
113        private boolean populateJMSXUserID;
114        private boolean useShutdownHook = true;
115        private boolean useLoggingForShutdownErrors;
116        private boolean shutdownOnMasterFailure;
117        private boolean shutdownOnSlaveFailure;
118        private boolean waitForSlave;
119        private String brokerName = DEFAULT_BROKER_NAME;
120        private File dataDirectoryFile;
121        private File tmpDataDirectory;
122        private Broker broker;
123        private BrokerView adminView;
124        private ManagementContext managementContext;
125        private ObjectName brokerObjectName;
126        private TaskRunnerFactory taskRunnerFactory;
127        private TaskRunnerFactory persistenceTaskRunnerFactory;
128        private SystemUsage systemUsage;
129        private SystemUsage producerSystemUsage;
130        private SystemUsage consumerSystemUsaage;
131        private PersistenceAdapter persistenceAdapter;
132        private PersistenceAdapterFactory persistenceFactory;
133        protected DestinationFactory destinationFactory;
134        private MessageAuthorizationPolicy messageAuthorizationPolicy;
135        private List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
136        private List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
137        private List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
138        private List<ObjectName> registeredMBeanNames = new CopyOnWriteArrayList<ObjectName>();
139        private List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
140        private List<Service> services = new ArrayList<Service>();
141        private MasterConnector masterConnector;
142        private String masterConnectorURI;
143        private transient Thread shutdownHook;
144        private String[] transportConnectorURIs;
145        private String[] networkConnectorURIs;
146        private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
147        // to other jms messaging
148        // systems
149        private boolean deleteAllMessagesOnStartup;
150        private boolean advisorySupport = true;
151        private URI vmConnectorURI;
152        private PolicyMap destinationPolicy;
153        private AtomicBoolean started = new AtomicBoolean(false);
154        private AtomicBoolean stopped = new AtomicBoolean(false);
155        private BrokerPlugin[] plugins;
156        private boolean keepDurableSubsActive = true;
157        private boolean useVirtualTopics = true;
158        private boolean useMirroredQueues = false;
159        private boolean useTempMirroredQueues=true;
160        private BrokerId brokerId;
161        private DestinationInterceptor[] destinationInterceptors;
162        private ActiveMQDestination[] destinations;
163        private Store tempDataStore;
164        private int persistenceThreadPriority = Thread.MAX_PRIORITY;
165        private boolean useLocalHostBrokerName;
166        private CountDownLatch stoppedLatch = new CountDownLatch(1);
167        private CountDownLatch startedLatch = new CountDownLatch(1);
168        private boolean supportFailOver;
169        private Broker regionBroker;
170        private int producerSystemUsagePortion = 60;
171        private int consumerSystemUsagePortion = 40;
172        private boolean splitSystemUsageForProducersConsumers;
173        private boolean monitorConnectionSplits=false;
174        private int taskRunnerPriority = Thread.NORM_PRIORITY;
175        private boolean dedicatedTaskRunner;
176        private boolean cacheTempDestinations=false;//useful for failover
177        private int timeBeforePurgeTempDestinations = 5000;
178        private List<Runnable> shutdownHooks= new ArrayList<Runnable>();
179        private boolean systemExitOnShutdown;
180        private int systemExitOnShutdownExitCode;
181        private SslContext sslContext;
182        
183        private boolean forceStart = false;
184        
185        static {
186            String localHostName = "localhost";
187            try {
188                localHostName = java.net.InetAddress.getLocalHost().getHostName();
189            } catch (UnknownHostException e) {
190                LOG.error("Failed to resolve localhost");
191            }
192            LOCAL_HOST_NAME = localHostName;
193        }
194    
195        @Override
196        public String toString() {
197            return "BrokerService[" + getBrokerName() + "]";
198        }
199    
200        /**
201         * Adds a new transport connector for the given bind address
202         * 
203         * @return the newly created and added transport connector
204         * @throws Exception
205         */
206        public TransportConnector addConnector(String bindAddress) throws Exception {
207            return addConnector(new URI(bindAddress));
208        }
209    
210        /**
211         * Adds a new transport connector for the given bind address
212         * 
213         * @return the newly created and added transport connector
214         * @throws Exception
215         */
216        public TransportConnector addConnector(URI bindAddress) throws Exception {
217            return addConnector(createTransportConnector(bindAddress));
218        }
219    
220        /**
221         * Adds a new transport connector for the given TransportServer transport
222         * 
223         * @return the newly created and added transport connector
224         * @throws Exception
225         */
226        public TransportConnector addConnector(TransportServer transport) throws Exception {
227            return addConnector(new TransportConnector(transport));
228        }
229    
230        /**
231         * Adds a new transport connector
232         * 
233         * @return the transport connector
234         * @throws Exception
235         */
236        public TransportConnector addConnector(TransportConnector connector) throws Exception {
237    
238            transportConnectors.add(connector);
239    
240            return connector;
241        }
242    
243        /**
244         * Stops and removes a transport connector from the broker.
245         * 
246         * @param connector
247         * @return true if the connector has been previously added to the broker
248         * @throws Exception
249         */
250        public boolean removeConnector(TransportConnector connector) throws Exception {
251            boolean rc = transportConnectors.remove(connector);
252            if (rc) {
253                unregisterConnectorMBean(connector);
254            }
255            return rc;
256    
257        }
258    
259        /**
260         * Adds a new network connector using the given discovery address
261         * 
262         * @return the newly created and added network connector
263         * @throws Exception
264         */
265        public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
266            return addNetworkConnector(new URI(discoveryAddress));
267        }
268    
269        /**
270         * Adds a new proxy connector using the given bind address
271         * 
272         * @return the newly created and added network connector
273         * @throws Exception
274         */
275        public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
276            return addProxyConnector(new URI(bindAddress));
277        }
278    
279        /**
280         * Adds a new network connector using the given discovery address
281         * 
282         * @return the newly created and added network connector
283         * @throws Exception
284         */
285        public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
286            if (!isAdvisorySupport()) {
287                throw new javax.jms.IllegalStateException("Networks require advisory messages to function - advisories are currently disabled");
288            }
289            NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
290            return addNetworkConnector(connector);
291        }
292    
293        /**
294         * Adds a new proxy connector using the given bind address
295         * 
296         * @return the newly created and added network connector
297         * @throws Exception
298         */
299        public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
300            ProxyConnector connector = new ProxyConnector();
301            connector.setBind(bindAddress);
302            connector.setRemote(new URI("fanout:multicast://default"));
303            return addProxyConnector(connector);
304        }
305    
306        /**
307         * Adds a new network connector to connect this broker to a federated
308         * network
309         */
310        public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
311            connector.setBrokerService(this);
312            URI uri = getVmConnectorURI();
313            Map<String, String> map = new HashMap<String, String>(URISupport.parseParamters(uri));
314            map.put("network", "true");
315            uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
316            connector.setLocalUri(uri);
317    
318            // Set a connection filter so that the connector does not establish loop
319            // back connections.
320            connector.setConnectionFilter(new ConnectionFilter() {
321                public boolean connectTo(URI location) {
322                    List<TransportConnector> transportConnectors = getTransportConnectors();
323                    for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
324                        try {
325                            TransportConnector tc = iter.next();
326                            if (location.equals(tc.getConnectUri())) {
327                                return false;
328                            }
329                        } catch (Throwable e) {
330                        }
331                    }
332                    return true;
333                }
334            });
335    
336            networkConnectors.add(connector);
337            if (isUseJmx()) {
338                registerNetworkConnectorMBean(connector);
339            }
340            return connector;
341        }
342    
343        /**
344         * Removes the given network connector without stopping it. The caller
345         * should call {@link NetworkConnector#stop()} to close the connector
346         */
347        public boolean removeNetworkConnector(NetworkConnector connector) {
348            boolean answer = networkConnectors.remove(connector);
349            if (answer) {
350                unregisterNetworkConnectorMBean(connector);
351            }
352            return answer;
353        }
354    
355        public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
356            URI uri = getVmConnectorURI();
357            connector.setLocalUri(uri);
358            proxyConnectors.add(connector);
359            if (isUseJmx()) {
360                registerProxyConnectorMBean(connector);
361            }
362            return connector;
363        }
364    
365        public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
366            connector.setBrokerService(this);
367            jmsConnectors.add(connector);
368            if (isUseJmx()) {
369                registerJmsConnectorMBean(connector);
370            }
371            return connector;
372        }
373    
374        public JmsConnector removeJmsConnector(JmsConnector connector) {
375            if (jmsConnectors.remove(connector)) {
376                return connector;
377            }
378            return null;
379        }
380    
381        /**
382         * @return Returns the masterConnectorURI.
383         */
384        public String getMasterConnectorURI() {
385            return masterConnectorURI;
386        }
387    
388        /**
389         * @param masterConnectorURI The masterConnectorURI to set.
390         */
391        public void setMasterConnectorURI(String masterConnectorURI) {
392            this.masterConnectorURI = masterConnectorURI;
393        }
394    
395        /**
396         * @return true if this Broker is a slave to a Master
397         */
398        public boolean isSlave() {
399            return masterConnector != null && masterConnector.isSlave();
400        }
401    
402        public void masterFailed() {
403            if (shutdownOnMasterFailure) {
404                LOG.fatal("The Master has failed ... shutting down");
405                try {
406                    stop();
407                } catch (Exception e) {
408                    LOG.error("Failed to stop for master failure", e);
409                }
410            } else {
411                LOG.warn("Master Failed - starting all connectors");
412                try {
413                    startAllConnectors();
414                    broker.nowMasterBroker();
415                } catch (Exception e) {
416                    LOG.error("Failed to startAllConnectors");
417                }
418            }
419        }
420    
421        public boolean isStarted() {
422            return started.get();
423        }
424    
425        public void start(boolean force) throws Exception {
426            forceStart = force;
427            start();
428        }
429        
430        // Service interface
431        // -------------------------------------------------------------------------
432        public void start() throws Exception {
433            if (!started.compareAndSet(false, true)) {
434                // lets just ignore redundant start() calls
435                // as its way too easy to not be completely sure if start() has been
436                // called or not with the gazillion of different configuration
437                // mechanisms
438    
439                // throw new IllegalStateException("Allready started.");
440                return;
441            }
442    
443            try {
444                
445                if( systemExitOnShutdown ) {
446                    addShutdownHook(new Runnable(){
447                        public void run() {
448                            System.exit(systemExitOnShutdownExitCode);
449                        }
450                    });
451                }
452                
453                processHelperProperties();
454    
455    
456                
457                getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
458                getPersistenceAdapter().setBrokerName(getBrokerName());
459                LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
460                if (deleteAllMessagesOnStartup) {
461                    deleteAllMessages();
462                }
463                getPersistenceAdapter().start();
464    
465                startDestinations();
466    
467                addShutdownHook();
468                
469                getBroker().start();
470                
471                if (isUseJmx()) {
472                    getManagementContext().start();
473                    ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
474                    managedBroker.setContextBroker(broker);
475                    adminView = new BrokerView(this, managedBroker);
476                    MBeanServer mbeanServer = getManagementContext().getMBeanServer();
477                    if (mbeanServer != null) {
478                        ObjectName objectName = getBrokerObjectName();
479                        mbeanServer.registerMBean(adminView, objectName);
480                        registeredMBeanNames.add(objectName);
481                    }
482                }
483                
484                BrokerRegistry.getInstance().bind(getBrokerName(), this);
485                
486               // see if there is a MasterBroker service and if so, configure
487                // it and start it.
488                for (Service service : services) {
489                    if (service instanceof MasterConnector) {
490                        configureService(service);
491                        service.start();
492                    }
493                } 
494                if (!isSlave()) {
495                    startAllConnectors();
496                }
497    
498                if (isUseJmx() && masterConnector != null) {
499                    registerFTConnectorMBean(masterConnector);
500                }
501    
502                brokerId = broker.getBrokerId();
503                LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
504                getBroker().brokerServiceStarted();
505                startedLatch.countDown();
506            } catch (Exception e) {
507                LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
508                try{
509                    stop();
510                }catch(Exception ex) {
511                    LOG.warn("Failed to stop broker after failure in start ",ex);
512                }
513                throw e;
514            }
515        }
516    
517        public void stop() throws Exception {
518            if (!started.compareAndSet(true, false)) {
519                return;
520            }
521            LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down");
522            removeShutdownHook();
523            ServiceStopper stopper = new ServiceStopper();
524            if (services != null) {
525                for (Service service: services) {
526                    stopper.stop(service);
527                }
528            }
529            stopAllConnectors(stopper);
530            // remove any VMTransports connected
531            // this has to be done after services are stopped,
532            // to avoid timimg issue with discovery (spinning up a new instance)
533            BrokerRegistry.getInstance().unbind(getBrokerName());
534            VMTransportFactory.stopped(getBrokerName());        
535            if (broker != null) {
536                stopper.stop(broker);
537            }
538            if (tempDataStore != null) {
539                tempDataStore.close();
540            }
541            stopper.stop(persistenceAdapter);
542            if (isUseJmx()) {
543                MBeanServer mbeanServer = getManagementContext().getMBeanServer();
544                if (mbeanServer != null) {
545                    for (Iterator<ObjectName> iter = registeredMBeanNames.iterator(); iter.hasNext();) {
546                        ObjectName name = iter.next();
547                        try {
548                            mbeanServer.unregisterMBean(name);
549                        } catch (Exception e) {
550                            stopper.onException(mbeanServer, e);
551                        }
552                    }
553                }
554                registeredMBeanNames.clear();
555                stopper.stop(getManagementContext());
556            }
557            // Clear SelectorParser cache to free memory
558            SelectorParser.clearCache();
559            stopped.set(true);
560            stoppedLatch.countDown();
561            LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
562            synchronized(shutdownHooks) {
563                for (Runnable hook : shutdownHooks) {
564                    try {
565                        hook.run();
566                    } catch ( Throwable e ) {
567                        stopper.onException(hook, e);
568                    }
569                }
570            }
571            stopper.throwFirstException();
572        }
573    
574        /**
575         * A helper method to block the caller thread until the broker has been
576         * stopped
577         */
578        public void waitUntilStopped() {
579            while (isStarted() && !stopped.get()) {
580                try {
581                    stoppedLatch.await();
582                } catch (InterruptedException e) {
583                    // ignore
584                }
585            }
586        }
587    
588        
589        /**
590         * A helper method to block the caller thread until the broker has been
591         * started
592         */
593        public void waitUntilStarted() {
594            boolean waitSucceeded = false;
595            while (isStarted() && !stopped.get() && !waitSucceeded) {
596                try {
597                    waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
598                } catch (InterruptedException ignore) {
599                }
600            }
601        }
602    
603        // Properties
604        // -------------------------------------------------------------------------
605    
606        /**
607         * Returns the message broker
608         */
609        public Broker getBroker() throws Exception {
610            if (broker == null) {
611                LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker (" + getBrokerName() + ") is starting");
612                LOG.info("For help or more information please see: http://activemq.apache.org/");
613                broker = createBroker();
614            }
615            return broker;
616        }
617    
618        /**
619         * Returns the administration view of the broker; used to create and destroy
620         * resources such as queues and topics. Note this method returns null if JMX
621         * is disabled.
622         */
623        public BrokerView getAdminView() throws Exception {
624            if (adminView == null) {
625                // force lazy creation
626                getBroker();
627            }
628            return adminView;
629        }
630    
631        public void setAdminView(BrokerView adminView) {
632            this.adminView = adminView;
633        }
634    
635        public String getBrokerName() {
636            return brokerName;
637        }
638    
639        /**
640         * Sets the name of this broker; which must be unique in the network
641         * 
642         * @param brokerName
643         */
644        public void setBrokerName(String brokerName) {
645            if (brokerName == null) {
646                throw new NullPointerException("The broker name cannot be null");
647            }
648            String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
649            if (!str.equals(brokerName)) {
650                LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
651            }
652            this.brokerName = str.trim();
653    
654        }
655    
656        public PersistenceAdapterFactory getPersistenceFactory() {
657            if (persistenceFactory == null) {
658                persistenceFactory = createPersistenceFactory();
659            }
660            return persistenceFactory;
661        }
662    
663        public File getDataDirectoryFile() {
664            if (dataDirectoryFile == null) {
665                dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
666            }
667            return dataDirectoryFile;
668        }
669    
670        public File getBrokerDataDirectory() {
671            String brokerDir = getBrokerName();
672            return new File(getDataDirectoryFile(), brokerDir);
673        }
674    
675        /**
676         * Sets the directory in which the data files will be stored by default for
677         * the JDBC and Journal persistence adaptors.
678         * 
679         * @param dataDirectory the directory to store data files
680         */
681        public void setDataDirectory(String dataDirectory) {
682            setDataDirectoryFile(new File(dataDirectory));
683        }
684    
685        /**
686         * Sets the directory in which the data files will be stored by default for
687         * the JDBC and Journal persistence adaptors.
688         * 
689         * @param dataDirectoryFile the directory to store data files
690         */
691        public void setDataDirectoryFile(File dataDirectoryFile) {
692            this.dataDirectoryFile = dataDirectoryFile;
693        }
694    
695        /**
696         * @return the tmpDataDirectory
697         */
698        public File getTmpDataDirectory() {
699            if (tmpDataDirectory == null) {
700                tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
701            }
702            return tmpDataDirectory;
703        }
704    
705        /**
706         * @param tmpDataDirectory the tmpDataDirectory to set
707         */
708        public void setTmpDataDirectory(File tmpDataDirectory) {
709            this.tmpDataDirectory = tmpDataDirectory;
710        }
711    
712        public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
713            this.persistenceFactory = persistenceFactory;
714        }
715    
716        public void setDestinationFactory(DestinationFactory destinationFactory) {
717            this.destinationFactory = destinationFactory;
718        }
719    
720        public boolean isPersistent() {
721            return persistent;
722        }
723    
724        /**
725         * Sets whether or not persistence is enabled or disabled.
726         */
727        public void setPersistent(boolean persistent) {
728            this.persistent = persistent;
729        }
730    
731        public boolean isPopulateJMSXUserID() {
732            return populateJMSXUserID;
733        }
734    
735        /**
736         * Sets whether or not the broker should populate the JMSXUserID header.
737         */
738        public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
739            this.populateJMSXUserID = populateJMSXUserID;
740        }
741    
742        public SystemUsage getSystemUsage() {
743            try {
744                if (systemUsage == null) {
745                    systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
746                    systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default 64 Meg
747                    systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10 Gb
748                    systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
749                    addService(this.systemUsage);
750                }
751                return systemUsage;
752            } catch (IOException e) {
753                LOG.fatal("Cannot create SystemUsage", e);
754                throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
755            }
756        }
757    
758        public void setSystemUsage(SystemUsage memoryManager) {
759            if (this.systemUsage != null) {
760                removeService(this.systemUsage);
761            }
762            this.systemUsage = memoryManager;
763            addService(this.systemUsage);
764        }
765    
766        /**
767         * @return the consumerUsageManager
768         * @throws IOException 
769         */
770        public SystemUsage getConsumerSystemUsage() throws IOException {
771            if (this.consumerSystemUsaage == null) {
772                if(splitSystemUsageForProducersConsumers) {
773                    this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
774                    float portion = consumerSystemUsagePortion/100f;
775                    this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
776                    addService(this.consumerSystemUsaage);
777                }else {
778                    consumerSystemUsaage=getSystemUsage();
779                }
780            }
781            return this.consumerSystemUsaage;
782        }
783    
784        /**
785         * @param consumerSystemUsaage the storeSystemUsage to set
786         */
787        public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
788            if (this.consumerSystemUsaage != null) {
789                removeService(this.consumerSystemUsaage);
790            }
791            this.consumerSystemUsaage = consumerSystemUsaage;
792            addService(this.consumerSystemUsaage);
793        }
794    
795        /**
796         * @return the producerUsageManager
797         * @throws IOException 
798         */
799        public SystemUsage getProducerSystemUsage() throws IOException {
800            if (producerSystemUsage == null ) {
801                if (splitSystemUsageForProducersConsumers) {
802                    producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
803                    float portion = producerSystemUsagePortion/100f;
804                    producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
805                    addService(producerSystemUsage);
806                }else {
807                    producerSystemUsage=getSystemUsage();
808                }
809            }
810            return producerSystemUsage;
811        }
812    
813        /**
814         * @param producerUsageManager the producerUsageManager to set
815         */
816        public void setProducerSystemUsage(SystemUsage producerUsageManager) {
817            if (this.producerSystemUsage != null) {
818                removeService(this.producerSystemUsage);
819            }
820            this.producerSystemUsage = producerUsageManager;
821            addService(this.producerSystemUsage);
822        }
823    
824        public PersistenceAdapter getPersistenceAdapter() throws IOException {
825            if (persistenceAdapter == null) {
826                persistenceAdapter = createPersistenceAdapter();
827                configureService(persistenceAdapter);
828                this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
829            }
830            return persistenceAdapter;
831        }
832    
833        /**
834         * Sets the persistence adaptor implementation to use for this broker
835         * @throws IOException 
836         */
837        public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
838            this.persistenceAdapter = persistenceAdapter;
839            configureService(this.persistenceAdapter);
840            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
841            
842        }
843    
844        public TaskRunnerFactory getTaskRunnerFactory() {
845            if (taskRunnerFactory == null) {
846                taskRunnerFactory = new TaskRunnerFactory("BrokerService",getTaskRunnerPriority(),true,1000,isDedicatedTaskRunner());
847            }
848            return taskRunnerFactory;
849        }
850    
851        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
852            this.taskRunnerFactory = taskRunnerFactory;
853        }
854    
855        public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
856            if (taskRunnerFactory == null) {
857                persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, true, 1000);
858            }
859            return persistenceTaskRunnerFactory;
860        }
861    
862        public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
863            this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
864        }
865    
866        public boolean isUseJmx() {
867            return useJmx;
868        }
869    
870        public boolean isEnableStatistics() {
871            return enableStatistics;
872        }
873    
874        /**
875         * Sets whether or not the Broker's services enable statistics or not.
876         */
877        public void setEnableStatistics(boolean enableStatistics) {
878            this.enableStatistics = enableStatistics;
879        }
880    
881        /**
882         * Sets whether or not the Broker's services should be exposed into JMX or
883         * not.
884         */
885        public void setUseJmx(boolean useJmx) {
886            this.useJmx = useJmx;
887        }
888    
889        public ObjectName getBrokerObjectName() throws IOException {
890            if (brokerObjectName == null) {
891                brokerObjectName = createBrokerObjectName();
892            }
893            return brokerObjectName;
894        }
895    
896        /**
897         * Sets the JMX ObjectName for this broker
898         */
899        public void setBrokerObjectName(ObjectName brokerObjectName) {
900            this.brokerObjectName = brokerObjectName;
901        }
902    
903        public ManagementContext getManagementContext() {
904            if (managementContext == null) {
905                managementContext = new ManagementContext();
906            }
907            return managementContext;
908        }
909    
910        public void setManagementContext(ManagementContext managementContext) {
911            this.managementContext = managementContext;
912        }
913    
914        public NetworkConnector getNetworkConnectorByName(String connectorName) {
915            for(NetworkConnector connector : networkConnectors) {
916                if(connector.getName().equals(connectorName)) {
917                    return connector;
918                }
919            }
920            return null;
921        }
922    
923        public String[] getNetworkConnectorURIs() {
924            return networkConnectorURIs;
925        }
926    
927        public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
928            this.networkConnectorURIs = networkConnectorURIs;
929        }
930    
931        public TransportConnector getConnectorByName(String connectorName) {
932            for(TransportConnector connector : transportConnectors) {
933                if(connector.getName().equals(connectorName)) {
934                    return connector;
935                }
936            }
937            return null;
938        }
939    
940        public String[] getTransportConnectorURIs() {
941            return transportConnectorURIs;
942        }
943    
944        public void setTransportConnectorURIs(String[] transportConnectorURIs) {
945            this.transportConnectorURIs = transportConnectorURIs;
946        }
947    
948        /**
949         * @return Returns the jmsBridgeConnectors.
950         */
951        public JmsConnector[] getJmsBridgeConnectors() {
952            return jmsBridgeConnectors;
953        }
954    
955        /**
956         * @param jmsConnectors The jmsBridgeConnectors to set.
957         */
958        public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
959            this.jmsBridgeConnectors = jmsConnectors;
960        }
961    
962        public Service[] getServices() {
963            return (Service[]) services.toArray();
964        }
965    
966        /**
967         * Sets the services associated with this broker such as a
968         * {@link MasterConnector}
969         */
970        public void setServices(Service[] services) {
971            this.services.clear();
972            if (services != null) {
973                for (int i=0; i < services.length;i++) {
974                    this.services.add(services[i]);
975                }
976            }
977        }
978    
979        /**
980         * Adds a new service so that it will be started as part of the broker
981         * lifecycle
982         */
983        public void addService(Service service) {
984            services.add(service);
985        }
986        
987        public void removeService(Service service) {
988            services.remove(service);
989        }
990    
991        public boolean isUseLoggingForShutdownErrors() {
992            return useLoggingForShutdownErrors;
993        }
994    
995        /**
996         * Sets whether or not we should use commons-logging when reporting errors
997         * when shutting down the broker
998         */
999        public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1000            this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1001        }
1002    
1003        public boolean isUseShutdownHook() {
1004            return useShutdownHook;
1005        }
1006    
1007        /**
1008         * Sets whether or not we should use a shutdown handler to close down the
1009         * broker cleanly if the JVM is terminated. It is recommended you leave this
1010         * enabled.
1011         */
1012        public void setUseShutdownHook(boolean useShutdownHook) {
1013            this.useShutdownHook = useShutdownHook;
1014        }
1015    
1016        public boolean isAdvisorySupport() {
1017            return advisorySupport;
1018        }
1019    
1020        /**
1021         * Allows the support of advisory messages to be disabled for performance
1022         * reasons.
1023         */
1024        public void setAdvisorySupport(boolean advisorySupport) {
1025            this.advisorySupport = advisorySupport;
1026        }
1027    
1028        public List<TransportConnector> getTransportConnectors() {
1029            return new ArrayList<TransportConnector>(transportConnectors);
1030        }
1031    
1032        /**
1033         * Sets the transport connectors which this broker will listen on for new
1034         * clients
1035         * 
1036         * @org.apache.xbean.Property nestedType="org.apache.activemq.broker.TransportConnector"
1037         */
1038        public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1039            for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
1040                TransportConnector connector = iter.next();
1041                addConnector(connector);
1042            }
1043        }
1044    
1045        public List<NetworkConnector> getNetworkConnectors() {
1046            return new ArrayList<NetworkConnector>(networkConnectors);
1047        }
1048    
1049        public List<ProxyConnector> getProxyConnectors() {
1050            return new ArrayList<ProxyConnector>(proxyConnectors);
1051        }
1052    
1053        /**
1054         * Sets the network connectors which this broker will use to connect to
1055         * other brokers in a federated network
1056         * 
1057         * @org.apache.xbean.Property nestedType="org.apache.activemq.network.NetworkConnector"
1058         */
1059        public void setNetworkConnectors(List networkConnectors) throws Exception {
1060            for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
1061                NetworkConnector connector = (NetworkConnector)iter.next();
1062                addNetworkConnector(connector);
1063            }
1064        }
1065    
1066        /**
1067         * Sets the network connectors which this broker will use to connect to
1068         * other brokers in a federated network
1069         */
1070        public void setProxyConnectors(List proxyConnectors) throws Exception {
1071            for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
1072                ProxyConnector connector = (ProxyConnector)iter.next();
1073                addProxyConnector(connector);
1074            }
1075        }
1076    
1077        public PolicyMap getDestinationPolicy() {
1078            return destinationPolicy;
1079        }
1080    
1081        /**
1082         * Sets the destination specific policies available either for exact
1083         * destinations or for wildcard areas of destinations.
1084         */
1085        public void setDestinationPolicy(PolicyMap policyMap) {
1086            this.destinationPolicy = policyMap;
1087        }
1088    
1089        public BrokerPlugin[] getPlugins() {
1090            return plugins;
1091        }
1092    
1093        /**
1094         * Sets a number of broker plugins to install such as for security
1095         * authentication or authorization
1096         */
1097        public void setPlugins(BrokerPlugin[] plugins) {
1098            this.plugins = plugins;
1099        }
1100    
1101        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1102            return messageAuthorizationPolicy;
1103        }
1104    
1105        /**
1106         * Sets the policy used to decide if the current connection is authorized to
1107         * consume a given message
1108         */
1109        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1110            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1111        }
1112    
1113        /**
1114         * Delete all messages from the persistent store
1115         * 
1116         * @throws IOException
1117         */
1118        public void deleteAllMessages() throws IOException {
1119            getPersistenceAdapter().deleteAllMessages();
1120        }
1121    
1122        public boolean isDeleteAllMessagesOnStartup() {
1123            return deleteAllMessagesOnStartup;
1124        }
1125    
1126        /**
1127         * Sets whether or not all messages are deleted on startup - mostly only
1128         * useful for testing.
1129         */
1130        public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1131            this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1132        }
1133    
1134        public URI getVmConnectorURI() {
1135            if (vmConnectorURI == null) {
1136                try {
1137                    vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1138                } catch (URISyntaxException e) {
1139                    LOG.error("Badly formed URI from " + getBrokerName(), e);
1140                }
1141            }
1142            return vmConnectorURI;
1143        }
1144    
1145        public void setVmConnectorURI(URI vmConnectorURI) {
1146            this.vmConnectorURI = vmConnectorURI;
1147        }
1148    
1149        /**
1150         * @return Returns the shutdownOnMasterFailure.
1151         */
1152        public boolean isShutdownOnMasterFailure() {
1153            return shutdownOnMasterFailure;
1154        }
1155    
1156        /**
1157         * @param shutdownOnMasterFailure The shutdownOnMasterFailure to set.
1158         */
1159        public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1160            this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1161        }
1162    
1163        public boolean isKeepDurableSubsActive() {
1164            return keepDurableSubsActive;
1165        }
1166    
1167        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1168            this.keepDurableSubsActive = keepDurableSubsActive;
1169        }
1170    
1171        public boolean isUseVirtualTopics() {
1172            return useVirtualTopics;
1173        }
1174    
1175        /**
1176         * Sets whether or not <a
1177         * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1178         * Topics</a> should be supported by default if they have not been
1179         * explicitly configured.
1180         */
1181        public void setUseVirtualTopics(boolean useVirtualTopics) {
1182            this.useVirtualTopics = useVirtualTopics;
1183        }
1184    
1185        public DestinationInterceptor[] getDestinationInterceptors() {
1186            return destinationInterceptors;
1187        }
1188    
1189        public boolean isUseMirroredQueues() {
1190            return useMirroredQueues;
1191        }
1192    
1193        /**
1194         * Sets whether or not <a
1195         * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1196         * Queues</a> should be supported by default if they have not been
1197         * explicitly configured.
1198         */
1199        public void setUseMirroredQueues(boolean useMirroredQueues) {
1200            this.useMirroredQueues = useMirroredQueues;
1201        }
1202    
1203        /**
1204         * Sets the destination interceptors to use
1205         */
1206        public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1207            this.destinationInterceptors = destinationInterceptors;
1208        }
1209    
1210        public ActiveMQDestination[] getDestinations() {
1211            return destinations;
1212        }
1213    
1214        /**
1215         * Sets the destinations which should be loaded/created on startup
1216         */
1217        public void setDestinations(ActiveMQDestination[] destinations) {
1218            this.destinations = destinations;
1219        }
1220    
1221        /**
1222         * @return the tempDataStore
1223         */
1224        public synchronized Store getTempDataStore() {
1225            if (tempDataStore == null) {
1226                
1227                if (!isPersistent()) {
1228                    return null;
1229                }
1230                
1231                boolean result = true;
1232                boolean empty = true;
1233                try {
1234                    File directory = getTmpDataDirectory();
1235                    if (directory.exists() && directory.isDirectory()) {
1236                        File[] files = directory.listFiles();
1237                        if (files != null && files.length > 0) {
1238                            empty = false;
1239                            for (int i = 0; i < files.length; i++) {
1240                                File file = files[i];
1241                                if (!file.isDirectory()) {
1242                                    result &= file.delete();
1243                                }
1244                            }
1245                        }
1246                    }
1247                    if (!empty) {
1248                        String str = result ? "Successfully deleted" : "Failed to delete";
1249                        LOG.info(str + " temporary storage");
1250                    }
1251                    tempDataStore = StoreFactory.open(getTmpDataDirectory(), "rw");
1252                } catch (IOException e) {
1253                    throw new RuntimeException(e);
1254                }
1255            }
1256            return tempDataStore;
1257        }
1258    
1259        /**
1260         * @param tempDataStore the tempDataStore to set
1261         */
1262        public void setTempDataStore(Store tempDataStore) {
1263            this.tempDataStore = tempDataStore;
1264        }
1265    
1266        public int getPersistenceThreadPriority() {
1267            return persistenceThreadPriority;
1268        }
1269    
1270        public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1271            this.persistenceThreadPriority = persistenceThreadPriority;
1272        }
1273    
1274        /**
1275         * @return the useLocalHostBrokerName
1276         */
1277        public boolean isUseLocalHostBrokerName() {
1278            return this.useLocalHostBrokerName;
1279        }
1280    
1281        /**
1282         * @param useLocalHostBrokerName the useLocalHostBrokerName to set
1283         */
1284        public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1285            this.useLocalHostBrokerName = useLocalHostBrokerName;
1286            if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1287                brokerName = LOCAL_HOST_NAME;
1288            }
1289        }
1290    
1291        /**
1292         * @return the supportFailOver
1293         */
1294        public boolean isSupportFailOver() {
1295            return this.supportFailOver;
1296        }
1297    
1298        /**
1299         * @param supportFailOver the supportFailOver to set
1300         */
1301        public void setSupportFailOver(boolean supportFailOver) {
1302            this.supportFailOver = supportFailOver;
1303        }
1304    
1305        /**
1306         * Looks up and lazily creates if necessary the destination for the given JMS name
1307         */
1308        public Destination getDestination(ActiveMQDestination destination) throws Exception {
1309            return getBroker().addDestination(getAdminConnectionContext(), destination);
1310        }
1311        
1312        public void removeDestination(ActiveMQDestination destination) throws Exception {
1313            getBroker().removeDestination(getAdminConnectionContext(), destination,0);
1314        }
1315        
1316        public int getProducerSystemUsagePortion() {
1317            return producerSystemUsagePortion;
1318        }
1319    
1320        public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1321            this.producerSystemUsagePortion = producerSystemUsagePortion;
1322        }
1323    
1324        public int getConsumerSystemUsagePortion() {
1325            return consumerSystemUsagePortion;
1326        }
1327    
1328        public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1329            this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1330        }
1331    
1332        public boolean isSplitSystemUsageForProducersConsumers() {
1333            return splitSystemUsageForProducersConsumers;
1334        }
1335    
1336        public void setSplitSystemUsageForProducersConsumers(
1337                boolean splitSystemUsageForProducersConsumers) {
1338            this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1339        }
1340        
1341        public boolean isMonitorConnectionSplits() {
1342                    return monitorConnectionSplits;
1343            }
1344    
1345            public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1346                    this.monitorConnectionSplits = monitorConnectionSplits;
1347            }
1348            public int getTaskRunnerPriority() {
1349                    return taskRunnerPriority;
1350            }
1351    
1352            public void setTaskRunnerPriority(int taskRunnerPriority) {
1353                    this.taskRunnerPriority = taskRunnerPriority;
1354            }
1355    
1356            public boolean isDedicatedTaskRunner() {
1357                    return dedicatedTaskRunner;
1358            }
1359    
1360            public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1361                    this.dedicatedTaskRunner = dedicatedTaskRunner;
1362            }
1363            
1364            public boolean isCacheTempDestinations() {
1365            return cacheTempDestinations;
1366        }
1367    
1368        public void setCacheTempDestinations(boolean cacheTempDestinations) {
1369            this.cacheTempDestinations = cacheTempDestinations;
1370        }
1371    
1372        public int getTimeBeforePurgeTempDestinations() {
1373            return timeBeforePurgeTempDestinations;
1374        }
1375    
1376        public void setTimeBeforePurgeTempDestinations(
1377                int timeBeforePurgeTempDestinations) {
1378            this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1379        }
1380        
1381        public boolean isUseTempMirroredQueues() {
1382            return useTempMirroredQueues;
1383        }
1384    
1385        public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1386            this.useTempMirroredQueues = useTempMirroredQueues;
1387        }
1388            //
1389        // Implementation methods
1390        // -------------------------------------------------------------------------
1391        /**
1392         * Handles any lazy-creation helper properties which are added to make
1393         * things easier to configure inside environments such as Spring
1394         * 
1395         * @throws Exception
1396         */
1397        protected void processHelperProperties() throws Exception {
1398            boolean masterServiceExists = false;
1399            if (transportConnectorURIs != null) {
1400                for (int i = 0; i < transportConnectorURIs.length; i++) {
1401                    String uri = transportConnectorURIs[i];
1402                    addConnector(uri);
1403                }
1404            }
1405            if (networkConnectorURIs != null) {
1406                for (int i = 0; i < networkConnectorURIs.length; i++) {
1407                    String uri = networkConnectorURIs[i];
1408                    addNetworkConnector(uri);
1409                }
1410            }
1411    
1412            if (jmsBridgeConnectors != null) {
1413                for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1414                    addJmsConnector(jmsBridgeConnectors[i]);
1415                }
1416            }
1417            for (Service service : services) {
1418                if (service instanceof MasterConnector) {
1419                    masterServiceExists = true;
1420                    break;
1421                }
1422            }
1423            if (masterConnectorURI != null) {
1424                if (masterServiceExists) {
1425                    throw new IllegalStateException("Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1426                } else {
1427                    addService(new MasterConnector(masterConnectorURI));
1428                }
1429            }
1430        }
1431    
1432        protected void stopAllConnectors(ServiceStopper stopper) {
1433    
1434            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1435                NetworkConnector connector = iter.next();
1436                unregisterNetworkConnectorMBean(connector);
1437                stopper.stop(connector);
1438            }
1439    
1440            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1441                ProxyConnector connector = iter.next();
1442                stopper.stop(connector);
1443            }
1444    
1445            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1446                JmsConnector connector = iter.next();
1447                stopper.stop(connector);
1448            }
1449    
1450            for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1451                TransportConnector connector = iter.next();
1452                stopper.stop(connector);
1453            }
1454        }
1455    
1456        protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1457            MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1458            if (mbeanServer != null) {
1459    
1460                try {
1461                    ObjectName objectName = createConnectorObjectName(connector);
1462                    connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), objectName);
1463                    ConnectorViewMBean view = new ConnectorView(connector);
1464                    mbeanServer.registerMBean(view, objectName);
1465                    registeredMBeanNames.add(objectName);
1466                    return connector;
1467                } catch (Throwable e) {
1468                    throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1469                }
1470            }
1471            return connector;
1472        }
1473    
1474        protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1475            if (isUseJmx()) {
1476                MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1477                if (mbeanServer != null) {
1478                    try {
1479                        ObjectName objectName = createConnectorObjectName(connector);
1480    
1481                        if (registeredMBeanNames.remove(objectName)) {
1482                            mbeanServer.unregisterMBean(objectName);
1483                        }
1484                    } catch (Throwable e) {
1485                        throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1486                    }
1487                }
1488            }
1489        }
1490        
1491        protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1492    //        MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1493    //        if (mbeanServer != null) {
1494    //
1495    //          
1496    //        }
1497            return adaptor;
1498        }
1499    
1500        protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1501            if (isUseJmx()) {
1502                MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1503                if (mbeanServer != null) {
1504                    
1505                }       
1506            }
1507        }
1508    
1509        private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
1510            return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector,"
1511                                  + "ConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1512        }
1513    
1514        protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
1515            MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1516            if (mbeanServer != null) {
1517                NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
1518                try {
1519                    ObjectName objectName = createNetworkConnectorObjectName(connector);
1520                    connector.setObjectName(objectName);
1521                    mbeanServer.registerMBean(view, objectName);
1522                    registeredMBeanNames.add(objectName);
1523                } catch (Throwable e) {
1524                    throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
1525                }
1526            }
1527        }
1528    
1529        protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
1530            return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1531                                  + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1532        }
1533    
1534        protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
1535            if (isUseJmx()) {
1536                MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1537                if (mbeanServer != null) {
1538                    try {
1539                        ObjectName objectName = createNetworkConnectorObjectName(connector);
1540                        if (registeredMBeanNames.remove(objectName)) {
1541                            mbeanServer.unregisterMBean(objectName);
1542                        }
1543                    } catch (Exception e) {
1544                        LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
1545                    }
1546                }
1547            }
1548        }
1549    
1550        protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
1551            MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1552            if (mbeanServer != null) {
1553                ProxyConnectorView view = new ProxyConnectorView(connector);
1554                try {
1555                    ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
1556                                                           + "Type=ProxyConnector," + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1557                    mbeanServer.registerMBean(view, objectName);
1558                    registeredMBeanNames.add(objectName);
1559                } catch (Throwable e) {
1560                    throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1561                }
1562            }
1563        }
1564    
1565        protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
1566            MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1567            if (mbeanServer != null) {
1568                FTConnectorView view = new FTConnectorView(connector);
1569                try {
1570                    ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
1571                                                           + "Type=MasterConnector");
1572                    mbeanServer.registerMBean(view, objectName);
1573                    registeredMBeanNames.add(objectName);
1574                } catch (Throwable e) {
1575                    throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1576                }
1577            }
1578        }
1579    
1580        protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
1581            MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1582            if (mbeanServer != null) {
1583                JmsConnectorView view = new JmsConnectorView(connector);
1584                try {
1585                    ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
1586                                                           + "Type=JmsConnector," + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1587                    mbeanServer.registerMBean(view, objectName);
1588                    registeredMBeanNames.add(objectName);
1589                } catch (Throwable e) {
1590                    throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1591                }
1592            }
1593        }
1594    
1595        /**
1596         * Factory method to create a new broker
1597         * 
1598         * @throws Exception
1599         * @throws
1600         * @throws
1601         */
1602        protected Broker createBroker() throws Exception {
1603            regionBroker = createRegionBroker();
1604            Broker broker = addInterceptors(regionBroker);
1605    
1606            // Add a filter that will stop access to the broker once stopped
1607            broker = new MutableBrokerFilter(broker) {
1608                    Broker old;
1609                
1610                    public void stop() throws Exception {
1611                    old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
1612                        // Just ignore additional stop actions.
1613                        public void stop() throws Exception {
1614                        }
1615                        
1616                    });
1617                    old.stop();
1618                }
1619                    
1620                    public void start() throws Exception {
1621                            if (forceStart && old != null) {
1622                                    this.next.set(old);
1623                            }
1624                            getNext().start();
1625                    }
1626                
1627            };
1628    
1629            return broker;
1630    
1631        }
1632    
1633        /**
1634         * Factory method to create the core region broker onto which interceptors
1635         * are added
1636         * 
1637         * @throws Exception
1638         */
1639        protected Broker createRegionBroker() throws Exception {
1640            if (destinationInterceptors == null) {
1641                destinationInterceptors = createDefaultDestinationInterceptor();
1642            }
1643            configureServices(destinationInterceptors);
1644    
1645            DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
1646            if (destinationFactory == null) {
1647                destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
1648            }
1649            return createRegionBroker(destinationInterceptor);
1650        }
1651        
1652        protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
1653                        RegionBroker regionBroker;
1654            if (isUseJmx()) {
1655                MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1656                regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
1657                                                       destinationInterceptor);
1658            } else {
1659                regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
1660            }
1661            destinationFactory.setRegionBroker(regionBroker);
1662    
1663            regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
1664            regionBroker.setBrokerName(getBrokerName());
1665            regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
1666    
1667                        return regionBroker;
1668            }
1669    
1670        /**
1671         * Create the default destination interceptor
1672         */
1673        protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
1674            List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
1675            if (isUseVirtualTopics()) {
1676                VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
1677                VirtualTopic virtualTopic = new VirtualTopic();
1678                virtualTopic.setName("VirtualTopic.>");
1679                VirtualDestination[] virtualDestinations = {virtualTopic};
1680                interceptor.setVirtualDestinations(virtualDestinations);
1681                answer.add(interceptor);
1682            }
1683            if (isUseMirroredQueues()) {
1684                MirroredQueue interceptor = new MirroredQueue();    
1685                answer.add(interceptor);
1686            }
1687            DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
1688            answer.toArray(array);
1689            return array;
1690        }
1691    
1692        /**
1693         * Strategy method to add interceptors to the broker
1694         * 
1695         * @throws IOException
1696         */
1697        protected Broker addInterceptors(Broker broker) throws Exception {
1698            broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
1699            if (isAdvisorySupport()) {
1700                broker = new AdvisoryBroker(broker);
1701            }
1702            broker = new CompositeDestinationBroker(broker);
1703            if (isPopulateJMSXUserID()) {
1704                broker = new UserIDBroker(broker);
1705            }
1706            if (isMonitorConnectionSplits()){
1707                    broker = new ConnectionSplitBroker(broker);
1708            }
1709            if (plugins != null) {
1710                for (int i = 0; i < plugins.length; i++) {
1711                    BrokerPlugin plugin = plugins[i];
1712                    broker = plugin.installPlugin(broker);
1713                }
1714            }
1715            return broker;
1716        }
1717    
1718        protected PersistenceAdapter createPersistenceAdapter() throws IOException {
1719            if (isPersistent()) {
1720                return getPersistenceFactory().createPersistenceAdapter();
1721            } else {
1722                return new MemoryPersistenceAdapter();
1723            }
1724        }
1725    
1726        protected AMQPersistenceAdapterFactory createPersistenceFactory() {
1727            AMQPersistenceAdapterFactory factory = new AMQPersistenceAdapterFactory();
1728            factory.setDataDirectory(getBrokerDataDirectory());
1729            factory.setTaskRunnerFactory(getPersistenceTaskRunnerFactory());
1730            factory.setBrokerName(getBrokerName());
1731            return factory;
1732        }
1733    
1734        protected ObjectName createBrokerObjectName() throws IOException {
1735            try {
1736                return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
1737            } catch (Throwable e) {
1738                throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
1739            }
1740        }
1741    
1742        protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
1743            TransportServer transport = TransportFactory.bind(this, brokerURI);
1744            return new TransportConnector(transport);
1745        }
1746    
1747        /**
1748         * Extracts the port from the options
1749         */
1750        protected Object getPort(Map options) {
1751            Object port = options.get("port");
1752            if (port == null) {
1753                port = DEFAULT_PORT;
1754                LOG.warn("No port specified so defaulting to: " + port);
1755            }
1756            return port;
1757        }
1758    
1759        protected void addShutdownHook() {
1760            if (useShutdownHook) {
1761                shutdownHook = new Thread("ActiveMQ ShutdownHook") {
1762                    public void run() {
1763                        containerShutdown();
1764                    }
1765                };
1766                Runtime.getRuntime().addShutdownHook(shutdownHook);
1767            }
1768        }
1769    
1770        protected void removeShutdownHook() {
1771            if (shutdownHook != null) {
1772                try {
1773                    Runtime.getRuntime().removeShutdownHook(shutdownHook);
1774                } catch (Exception e) {
1775                    LOG.debug("Caught exception, must be shutting down: " + e);
1776                }
1777            }
1778        }
1779    
1780        /**
1781         * Causes a clean shutdown of the container when the VM is being shut down
1782         */
1783        protected void containerShutdown() {
1784            try {
1785                stop();
1786            } catch (IOException e) {
1787                Throwable linkedException = e.getCause();
1788                if (linkedException != null) {
1789                    logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
1790                } else {
1791                    logError("Failed to shut down: " + e, e);
1792                }
1793                if (!useLoggingForShutdownErrors) {
1794                    e.printStackTrace(System.err);
1795                }
1796            } catch (Exception e) {
1797                logError("Failed to shut down: " + e, e);
1798            }
1799        }
1800    
1801        protected void logError(String message, Throwable e) {
1802            if (useLoggingForShutdownErrors) {
1803                LOG.error("Failed to shut down: " + e);
1804            } else {
1805                System.err.println("Failed to shut down: " + e);
1806            }
1807        }
1808    
1809        /**
1810         * Starts any configured destinations on startup
1811         */
1812        protected void startDestinations() throws Exception {
1813            if (destinations != null) {
1814                ConnectionContext adminConnectionContext = getAdminConnectionContext();
1815    
1816                for (int i = 0; i < destinations.length; i++) {
1817                    ActiveMQDestination destination = destinations[i];
1818                    getBroker().addDestination(adminConnectionContext, destination);
1819                }
1820            }
1821        }
1822    
1823        /**
1824         * Returns the broker's administration connection context used for
1825         * configuring the broker at startup
1826         */
1827        public ConnectionContext getAdminConnectionContext() throws Exception {
1828            ConnectionContext adminConnectionContext = getBroker().getAdminConnectionContext();
1829            if (adminConnectionContext == null) {
1830                adminConnectionContext = createAdminConnectionContext();
1831                getBroker().setAdminConnectionContext(adminConnectionContext);
1832            }
1833            return adminConnectionContext;
1834        }
1835    
1836        /**
1837         * Factory method to create the new administration connection context
1838         * object. Note this method is here rather than inside a default broker
1839         * implementation to ensure that the broker reference inside it is the outer
1840         * most interceptor
1841         */
1842        protected ConnectionContext createAdminConnectionContext() throws Exception {
1843            ConnectionContext context = new ConnectionContext();
1844            context.setBroker(getBroker());
1845            context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
1846            return context;
1847        }
1848    
1849        protected void waitForSlave(){
1850            try {
1851                    slaveStartSignal.await();
1852            }catch(InterruptedException e){
1853                    LOG.error("Exception waiting for slave:"+e);
1854            }
1855        }
1856        
1857        protected void slaveConnectionEstablished(){
1858            slaveStartSignal.countDown();
1859        }
1860        
1861        
1862        /**
1863         * Start all transport and network connections, proxies and bridges
1864         * 
1865         * @throws Exception
1866         */
1867        protected void startAllConnectors() throws Exception {
1868            if (!isSlave()) {
1869                    Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
1870                List<TransportConnector> al = new ArrayList<TransportConnector>();
1871    
1872                for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1873                    TransportConnector connector = iter.next();
1874                    connector.setBrokerService(this);
1875                    al.add(startTransportConnector(connector));
1876                }
1877    
1878                if (al.size() > 0) {
1879                    // let's clear the transportConnectors list and replace it with
1880                    // the started transportConnector instances
1881                    this.transportConnectors.clear();
1882                    setTransportConnectors(al);
1883                }
1884                URI uri = getVmConnectorURI();
1885                Map<String, String> map = new HashMap<String, String>(URISupport.parseParamters(uri));
1886                map.put("network", "true");
1887                map.put("async", "false");
1888                uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1889                if(isWaitForSlave()){
1890                    waitForSlave();
1891                }
1892                for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1893                    NetworkConnector connector = iter.next();
1894                    connector.setLocalUri(uri);
1895                    connector.setBrokerName(getBrokerName());
1896                    connector.setDurableDestinations(durableDestinations);
1897                    connector.start();
1898                }
1899    
1900                for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1901                    ProxyConnector connector = iter.next();
1902                    connector.start();
1903                }
1904    
1905                for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1906                    JmsConnector connector = iter.next();
1907                    connector.start();
1908                }
1909                for (Service service:services) {
1910                    configureService(service);
1911                    service.start();
1912                }
1913            }
1914        }
1915    
1916        protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
1917            connector.setTaskRunnerFactory(getTaskRunnerFactory());
1918            MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
1919            if (policy != null) {
1920                connector.setMessageAuthorizationPolicy(policy);
1921            }
1922    
1923            if (isUseJmx()) {
1924                connector = registerConnectorMBean(connector);
1925            }
1926    
1927            connector.getStatistics().setEnabled(enableStatistics);
1928    
1929            connector.start();
1930    
1931            return connector;
1932        }
1933    
1934        /**
1935         * Perform any custom dependency injection
1936         */
1937        protected void configureServices(Object[] services) {
1938            for (Object service : services) {
1939                configureService(service);
1940            }
1941        }
1942    
1943        /**
1944         * Perform any custom dependency injection
1945         */
1946        protected void configureService(Object service) {
1947            if (service instanceof BrokerServiceAware) {
1948                BrokerServiceAware serviceAware = (BrokerServiceAware) service;
1949                serviceAware.setBrokerService(this);
1950            }
1951            if (masterConnector == null) {
1952                if (service instanceof MasterConnector) {
1953                    masterConnector = (MasterConnector) service;
1954                    supportFailOver = true;
1955                }
1956            }
1957        }
1958    
1959        /**
1960         * Starts all destiantions in persistence store. This includes all inactive
1961         * destinations
1962         */
1963        protected void startDestinationsInPersistenceStore(Broker broker) throws Exception {
1964            Set destinations = destinationFactory.getDestinations();
1965            if (destinations != null) {
1966                Iterator iter = destinations.iterator();
1967    
1968                ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
1969                if (adminConnectionContext == null) {
1970                    ConnectionContext context = new ConnectionContext();
1971                    context.setBroker(broker);
1972                    adminConnectionContext = context;
1973                    broker.setAdminConnectionContext(adminConnectionContext);
1974                }
1975    
1976                while (iter.hasNext()) {
1977                    ActiveMQDestination destination = (ActiveMQDestination)iter.next();
1978                    broker.addDestination(adminConnectionContext, destination);
1979                }
1980            }
1981        }
1982    
1983        public Broker getRegionBroker() {
1984            return regionBroker;
1985        }
1986    
1987        public void setRegionBroker(Broker regionBroker) {
1988            this.regionBroker = regionBroker;
1989        }
1990    
1991        
1992        public void addShutdownHook(Runnable hook) {
1993            synchronized(shutdownHooks) {
1994                shutdownHooks.add(hook);
1995            }
1996        }
1997        
1998        public void removeShutdownHook(Runnable hook) {
1999            synchronized(shutdownHooks) {
2000                shutdownHooks.remove(hook);
2001            }
2002        }
2003    
2004        public boolean isSystemExitOnShutdown() {
2005            return systemExitOnShutdown;
2006        }
2007    
2008        public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2009            this.systemExitOnShutdown = systemExitOnShutdown;
2010        }
2011    
2012        public int getSystemExitOnShutdownExitCode() {
2013            return systemExitOnShutdownExitCode;
2014        }
2015    
2016        public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2017            this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2018        }
2019    
2020        public SslContext getSslContext() {
2021            return sslContext;
2022        }
2023    
2024        public void setSslContext(SslContext sslContext) {
2025            this.sslContext = sslContext;
2026        }
2027    
2028            public boolean isShutdownOnSlaveFailure() {
2029                    return shutdownOnSlaveFailure;
2030            }
2031    
2032            public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2033                    this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2034            }
2035    
2036            public boolean isWaitForSlave() {
2037                    return waitForSlave;
2038            }
2039    
2040            public void setWaitForSlave(boolean waitForSlave) {
2041                    this.waitForSlave = waitForSlave;
2042            }
2043    
2044            public CountDownLatch getSlaveStartSignal() {
2045                    return slaveStartSignal;
2046            }
2047    
2048    }