001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.net.URI;
022 import java.net.URISyntaxException;
023 import java.net.UnknownHostException;
024 import java.util.ArrayList;
025 import java.util.HashMap;
026 import java.util.Iterator;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.concurrent.CopyOnWriteArrayList;
031 import java.util.concurrent.CountDownLatch;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicBoolean;
034
035 import javax.management.MBeanServer;
036 import javax.management.MalformedObjectNameException;
037 import javax.management.ObjectName;
038
039 import org.apache.activemq.ActiveMQConnectionMetaData;
040 import org.apache.activemq.Service;
041 import org.apache.activemq.advisory.AdvisoryBroker;
042 import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
043 import org.apache.activemq.broker.ft.MasterConnector;
044 import org.apache.activemq.broker.jmx.BrokerView;
045 import org.apache.activemq.broker.jmx.ConnectorView;
046 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
047 import org.apache.activemq.broker.jmx.FTConnectorView;
048 import org.apache.activemq.broker.jmx.JmsConnectorView;
049 import org.apache.activemq.broker.jmx.ManagedRegionBroker;
050 import org.apache.activemq.broker.jmx.ManagementContext;
051 import org.apache.activemq.broker.jmx.NetworkConnectorView;
052 import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
053 import org.apache.activemq.broker.jmx.ProxyConnectorView;
054 import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
055 import org.apache.activemq.broker.region.Destination;
056 import org.apache.activemq.broker.region.DestinationFactory;
057 import org.apache.activemq.broker.region.DestinationFactoryImpl;
058 import org.apache.activemq.broker.region.DestinationInterceptor;
059 import org.apache.activemq.broker.region.RegionBroker;
060 import org.apache.activemq.broker.region.policy.PolicyMap;
061 import org.apache.activemq.broker.region.virtual.MirroredQueue;
062 import org.apache.activemq.broker.region.virtual.VirtualDestination;
063 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
064 import org.apache.activemq.broker.region.virtual.VirtualTopic;
065 import org.apache.activemq.command.ActiveMQDestination;
066 import org.apache.activemq.command.BrokerId;
067 import org.apache.activemq.kaha.Store;
068 import org.apache.activemq.kaha.StoreFactory;
069 import org.apache.activemq.network.ConnectionFilter;
070 import org.apache.activemq.network.DiscoveryNetworkConnector;
071 import org.apache.activemq.network.NetworkConnector;
072 import org.apache.activemq.network.jms.JmsConnector;
073 import org.apache.activemq.proxy.ProxyConnector;
074 import org.apache.activemq.security.MessageAuthorizationPolicy;
075 import org.apache.activemq.security.SecurityContext;
076 import org.apache.activemq.selector.SelectorParser;
077 import org.apache.activemq.store.PersistenceAdapter;
078 import org.apache.activemq.store.PersistenceAdapterFactory;
079 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
080 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
081 import org.apache.activemq.thread.TaskRunnerFactory;
082 import org.apache.activemq.transport.TransportFactory;
083 import org.apache.activemq.transport.TransportServer;
084 import org.apache.activemq.transport.vm.VMTransportFactory;
085 import org.apache.activemq.usage.SystemUsage;
086 import org.apache.activemq.util.IOExceptionSupport;
087 import org.apache.activemq.util.IOHelper;
088 import org.apache.activemq.util.JMXSupport;
089 import org.apache.activemq.util.ServiceStopper;
090 import org.apache.activemq.util.URISupport;
091 import org.apache.commons.logging.Log;
092 import org.apache.commons.logging.LogFactory;
093
094 /**
095 * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
096 * number of transport connectors, network connectors and a bunch of properties
097 * which can be used to configure the broker as its lazily created.
098 *
099 * @version $Revision: 1.1 $
100 */
101 public class BrokerService implements Service {
102 protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
103 public static final String DEFAULT_PORT = "61616";
104 public static final String LOCAL_HOST_NAME;
105 public static final String DEFAULT_BROKER_NAME = "localhost";
106
107 private static final Log LOG = LogFactory.getLog(BrokerService.class);
108 private static final long serialVersionUID = 7353129142305630237L;
109
110 private boolean useJmx = true;
111 private boolean enableStatistics = true;
112 private boolean persistent = true;
113 private boolean populateJMSXUserID;
114 private boolean useShutdownHook = true;
115 private boolean useLoggingForShutdownErrors;
116 private boolean shutdownOnMasterFailure;
117 private boolean shutdownOnSlaveFailure;
118 private boolean waitForSlave;
119 private String brokerName = DEFAULT_BROKER_NAME;
120 private File dataDirectoryFile;
121 private File tmpDataDirectory;
122 private Broker broker;
123 private BrokerView adminView;
124 private ManagementContext managementContext;
125 private ObjectName brokerObjectName;
126 private TaskRunnerFactory taskRunnerFactory;
127 private TaskRunnerFactory persistenceTaskRunnerFactory;
128 private SystemUsage systemUsage;
129 private SystemUsage producerSystemUsage;
130 private SystemUsage consumerSystemUsaage;
131 private PersistenceAdapter persistenceAdapter;
132 private PersistenceAdapterFactory persistenceFactory;
133 protected DestinationFactory destinationFactory;
134 private MessageAuthorizationPolicy messageAuthorizationPolicy;
135 private List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
136 private List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
137 private List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
138 private List<ObjectName> registeredMBeanNames = new CopyOnWriteArrayList<ObjectName>();
139 private List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
140 private List<Service> services = new ArrayList<Service>();
141 private MasterConnector masterConnector;
142 private String masterConnectorURI;
143 private transient Thread shutdownHook;
144 private String[] transportConnectorURIs;
145 private String[] networkConnectorURIs;
146 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
147 // to other jms messaging
148 // systems
149 private boolean deleteAllMessagesOnStartup;
150 private boolean advisorySupport = true;
151 private URI vmConnectorURI;
152 private PolicyMap destinationPolicy;
153 private AtomicBoolean started = new AtomicBoolean(false);
154 private AtomicBoolean stopped = new AtomicBoolean(false);
155 private BrokerPlugin[] plugins;
156 private boolean keepDurableSubsActive = true;
157 private boolean useVirtualTopics = true;
158 private boolean useMirroredQueues = false;
159 private boolean useTempMirroredQueues=true;
160 private BrokerId brokerId;
161 private DestinationInterceptor[] destinationInterceptors;
162 private ActiveMQDestination[] destinations;
163 private Store tempDataStore;
164 private int persistenceThreadPriority = Thread.MAX_PRIORITY;
165 private boolean useLocalHostBrokerName;
166 private CountDownLatch stoppedLatch = new CountDownLatch(1);
167 private CountDownLatch startedLatch = new CountDownLatch(1);
168 private boolean supportFailOver;
169 private Broker regionBroker;
170 private int producerSystemUsagePortion = 60;
171 private int consumerSystemUsagePortion = 40;
172 private boolean splitSystemUsageForProducersConsumers;
173 private boolean monitorConnectionSplits=false;
174 private int taskRunnerPriority = Thread.NORM_PRIORITY;
175 private boolean dedicatedTaskRunner;
176 private boolean cacheTempDestinations=false;//useful for failover
177 private int timeBeforePurgeTempDestinations = 5000;
178 private List<Runnable> shutdownHooks= new ArrayList<Runnable>();
179 private boolean systemExitOnShutdown;
180 private int systemExitOnShutdownExitCode;
181 private SslContext sslContext;
182
183 private boolean forceStart = false;
184
185 static {
186 String localHostName = "localhost";
187 try {
188 localHostName = java.net.InetAddress.getLocalHost().getHostName();
189 } catch (UnknownHostException e) {
190 LOG.error("Failed to resolve localhost");
191 }
192 LOCAL_HOST_NAME = localHostName;
193 }
194
195 @Override
196 public String toString() {
197 return "BrokerService[" + getBrokerName() + "]";
198 }
199
200 /**
201 * Adds a new transport connector for the given bind address
202 *
203 * @return the newly created and added transport connector
204 * @throws Exception
205 */
206 public TransportConnector addConnector(String bindAddress) throws Exception {
207 return addConnector(new URI(bindAddress));
208 }
209
210 /**
211 * Adds a new transport connector for the given bind address
212 *
213 * @return the newly created and added transport connector
214 * @throws Exception
215 */
216 public TransportConnector addConnector(URI bindAddress) throws Exception {
217 return addConnector(createTransportConnector(bindAddress));
218 }
219
220 /**
221 * Adds a new transport connector for the given TransportServer transport
222 *
223 * @return the newly created and added transport connector
224 * @throws Exception
225 */
226 public TransportConnector addConnector(TransportServer transport) throws Exception {
227 return addConnector(new TransportConnector(transport));
228 }
229
230 /**
231 * Adds a new transport connector
232 *
233 * @return the transport connector
234 * @throws Exception
235 */
236 public TransportConnector addConnector(TransportConnector connector) throws Exception {
237
238 transportConnectors.add(connector);
239
240 return connector;
241 }
242
243 /**
244 * Stops and removes a transport connector from the broker.
245 *
246 * @param connector
247 * @return true if the connector has been previously added to the broker
248 * @throws Exception
249 */
250 public boolean removeConnector(TransportConnector connector) throws Exception {
251 boolean rc = transportConnectors.remove(connector);
252 if (rc) {
253 unregisterConnectorMBean(connector);
254 }
255 return rc;
256
257 }
258
259 /**
260 * Adds a new network connector using the given discovery address
261 *
262 * @return the newly created and added network connector
263 * @throws Exception
264 */
265 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
266 return addNetworkConnector(new URI(discoveryAddress));
267 }
268
269 /**
270 * Adds a new proxy connector using the given bind address
271 *
272 * @return the newly created and added network connector
273 * @throws Exception
274 */
275 public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
276 return addProxyConnector(new URI(bindAddress));
277 }
278
279 /**
280 * Adds a new network connector using the given discovery address
281 *
282 * @return the newly created and added network connector
283 * @throws Exception
284 */
285 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
286 if (!isAdvisorySupport()) {
287 throw new javax.jms.IllegalStateException("Networks require advisory messages to function - advisories are currently disabled");
288 }
289 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
290 return addNetworkConnector(connector);
291 }
292
293 /**
294 * Adds a new proxy connector using the given bind address
295 *
296 * @return the newly created and added network connector
297 * @throws Exception
298 */
299 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
300 ProxyConnector connector = new ProxyConnector();
301 connector.setBind(bindAddress);
302 connector.setRemote(new URI("fanout:multicast://default"));
303 return addProxyConnector(connector);
304 }
305
306 /**
307 * Adds a new network connector to connect this broker to a federated
308 * network
309 */
310 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
311 connector.setBrokerService(this);
312 URI uri = getVmConnectorURI();
313 Map<String, String> map = new HashMap<String, String>(URISupport.parseParamters(uri));
314 map.put("network", "true");
315 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
316 connector.setLocalUri(uri);
317
318 // Set a connection filter so that the connector does not establish loop
319 // back connections.
320 connector.setConnectionFilter(new ConnectionFilter() {
321 public boolean connectTo(URI location) {
322 List<TransportConnector> transportConnectors = getTransportConnectors();
323 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
324 try {
325 TransportConnector tc = iter.next();
326 if (location.equals(tc.getConnectUri())) {
327 return false;
328 }
329 } catch (Throwable e) {
330 }
331 }
332 return true;
333 }
334 });
335
336 networkConnectors.add(connector);
337 if (isUseJmx()) {
338 registerNetworkConnectorMBean(connector);
339 }
340 return connector;
341 }
342
343 /**
344 * Removes the given network connector without stopping it. The caller
345 * should call {@link NetworkConnector#stop()} to close the connector
346 */
347 public boolean removeNetworkConnector(NetworkConnector connector) {
348 boolean answer = networkConnectors.remove(connector);
349 if (answer) {
350 unregisterNetworkConnectorMBean(connector);
351 }
352 return answer;
353 }
354
355 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
356 URI uri = getVmConnectorURI();
357 connector.setLocalUri(uri);
358 proxyConnectors.add(connector);
359 if (isUseJmx()) {
360 registerProxyConnectorMBean(connector);
361 }
362 return connector;
363 }
364
365 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
366 connector.setBrokerService(this);
367 jmsConnectors.add(connector);
368 if (isUseJmx()) {
369 registerJmsConnectorMBean(connector);
370 }
371 return connector;
372 }
373
374 public JmsConnector removeJmsConnector(JmsConnector connector) {
375 if (jmsConnectors.remove(connector)) {
376 return connector;
377 }
378 return null;
379 }
380
381 /**
382 * @return Returns the masterConnectorURI.
383 */
384 public String getMasterConnectorURI() {
385 return masterConnectorURI;
386 }
387
388 /**
389 * @param masterConnectorURI The masterConnectorURI to set.
390 */
391 public void setMasterConnectorURI(String masterConnectorURI) {
392 this.masterConnectorURI = masterConnectorURI;
393 }
394
395 /**
396 * @return true if this Broker is a slave to a Master
397 */
398 public boolean isSlave() {
399 return masterConnector != null && masterConnector.isSlave();
400 }
401
402 public void masterFailed() {
403 if (shutdownOnMasterFailure) {
404 LOG.fatal("The Master has failed ... shutting down");
405 try {
406 stop();
407 } catch (Exception e) {
408 LOG.error("Failed to stop for master failure", e);
409 }
410 } else {
411 LOG.warn("Master Failed - starting all connectors");
412 try {
413 startAllConnectors();
414 broker.nowMasterBroker();
415 } catch (Exception e) {
416 LOG.error("Failed to startAllConnectors");
417 }
418 }
419 }
420
421 public boolean isStarted() {
422 return started.get();
423 }
424
425 public void start(boolean force) throws Exception {
426 forceStart = force;
427 start();
428 }
429
430 // Service interface
431 // -------------------------------------------------------------------------
432 public void start() throws Exception {
433 if (!started.compareAndSet(false, true)) {
434 // lets just ignore redundant start() calls
435 // as its way too easy to not be completely sure if start() has been
436 // called or not with the gazillion of different configuration
437 // mechanisms
438
439 // throw new IllegalStateException("Allready started.");
440 return;
441 }
442
443 try {
444
445 if( systemExitOnShutdown ) {
446 addShutdownHook(new Runnable(){
447 public void run() {
448 System.exit(systemExitOnShutdownExitCode);
449 }
450 });
451 }
452
453 processHelperProperties();
454
455
456
457 getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
458 getPersistenceAdapter().setBrokerName(getBrokerName());
459 LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
460 if (deleteAllMessagesOnStartup) {
461 deleteAllMessages();
462 }
463 getPersistenceAdapter().start();
464
465 startDestinations();
466
467 addShutdownHook();
468
469 getBroker().start();
470
471 if (isUseJmx()) {
472 getManagementContext().start();
473 ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
474 managedBroker.setContextBroker(broker);
475 adminView = new BrokerView(this, managedBroker);
476 MBeanServer mbeanServer = getManagementContext().getMBeanServer();
477 if (mbeanServer != null) {
478 ObjectName objectName = getBrokerObjectName();
479 mbeanServer.registerMBean(adminView, objectName);
480 registeredMBeanNames.add(objectName);
481 }
482 }
483
484 BrokerRegistry.getInstance().bind(getBrokerName(), this);
485
486 // see if there is a MasterBroker service and if so, configure
487 // it and start it.
488 for (Service service : services) {
489 if (service instanceof MasterConnector) {
490 configureService(service);
491 service.start();
492 }
493 }
494 if (!isSlave()) {
495 startAllConnectors();
496 }
497
498 if (isUseJmx() && masterConnector != null) {
499 registerFTConnectorMBean(masterConnector);
500 }
501
502 brokerId = broker.getBrokerId();
503 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
504 getBroker().brokerServiceStarted();
505 startedLatch.countDown();
506 } catch (Exception e) {
507 LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
508 try{
509 stop();
510 }catch(Exception ex) {
511 LOG.warn("Failed to stop broker after failure in start ",ex);
512 }
513 throw e;
514 }
515 }
516
517 public void stop() throws Exception {
518 if (!started.compareAndSet(true, false)) {
519 return;
520 }
521 LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down");
522 removeShutdownHook();
523 ServiceStopper stopper = new ServiceStopper();
524 if (services != null) {
525 for (Service service: services) {
526 stopper.stop(service);
527 }
528 }
529 stopAllConnectors(stopper);
530 // remove any VMTransports connected
531 // this has to be done after services are stopped,
532 // to avoid timimg issue with discovery (spinning up a new instance)
533 BrokerRegistry.getInstance().unbind(getBrokerName());
534 VMTransportFactory.stopped(getBrokerName());
535 if (broker != null) {
536 stopper.stop(broker);
537 }
538 if (tempDataStore != null) {
539 tempDataStore.close();
540 }
541 stopper.stop(persistenceAdapter);
542 if (isUseJmx()) {
543 MBeanServer mbeanServer = getManagementContext().getMBeanServer();
544 if (mbeanServer != null) {
545 for (Iterator<ObjectName> iter = registeredMBeanNames.iterator(); iter.hasNext();) {
546 ObjectName name = iter.next();
547 try {
548 mbeanServer.unregisterMBean(name);
549 } catch (Exception e) {
550 stopper.onException(mbeanServer, e);
551 }
552 }
553 }
554 registeredMBeanNames.clear();
555 stopper.stop(getManagementContext());
556 }
557 // Clear SelectorParser cache to free memory
558 SelectorParser.clearCache();
559 stopped.set(true);
560 stoppedLatch.countDown();
561 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
562 synchronized(shutdownHooks) {
563 for (Runnable hook : shutdownHooks) {
564 try {
565 hook.run();
566 } catch ( Throwable e ) {
567 stopper.onException(hook, e);
568 }
569 }
570 }
571 stopper.throwFirstException();
572 }
573
574 /**
575 * A helper method to block the caller thread until the broker has been
576 * stopped
577 */
578 public void waitUntilStopped() {
579 while (isStarted() && !stopped.get()) {
580 try {
581 stoppedLatch.await();
582 } catch (InterruptedException e) {
583 // ignore
584 }
585 }
586 }
587
588
589 /**
590 * A helper method to block the caller thread until the broker has been
591 * started
592 */
593 public void waitUntilStarted() {
594 boolean waitSucceeded = false;
595 while (isStarted() && !stopped.get() && !waitSucceeded) {
596 try {
597 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
598 } catch (InterruptedException ignore) {
599 }
600 }
601 }
602
603 // Properties
604 // -------------------------------------------------------------------------
605
606 /**
607 * Returns the message broker
608 */
609 public Broker getBroker() throws Exception {
610 if (broker == null) {
611 LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker (" + getBrokerName() + ") is starting");
612 LOG.info("For help or more information please see: http://activemq.apache.org/");
613 broker = createBroker();
614 }
615 return broker;
616 }
617
618 /**
619 * Returns the administration view of the broker; used to create and destroy
620 * resources such as queues and topics. Note this method returns null if JMX
621 * is disabled.
622 */
623 public BrokerView getAdminView() throws Exception {
624 if (adminView == null) {
625 // force lazy creation
626 getBroker();
627 }
628 return adminView;
629 }
630
631 public void setAdminView(BrokerView adminView) {
632 this.adminView = adminView;
633 }
634
635 public String getBrokerName() {
636 return brokerName;
637 }
638
639 /**
640 * Sets the name of this broker; which must be unique in the network
641 *
642 * @param brokerName
643 */
644 public void setBrokerName(String brokerName) {
645 if (brokerName == null) {
646 throw new NullPointerException("The broker name cannot be null");
647 }
648 String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
649 if (!str.equals(brokerName)) {
650 LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
651 }
652 this.brokerName = str.trim();
653
654 }
655
656 public PersistenceAdapterFactory getPersistenceFactory() {
657 if (persistenceFactory == null) {
658 persistenceFactory = createPersistenceFactory();
659 }
660 return persistenceFactory;
661 }
662
663 public File getDataDirectoryFile() {
664 if (dataDirectoryFile == null) {
665 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
666 }
667 return dataDirectoryFile;
668 }
669
670 public File getBrokerDataDirectory() {
671 String brokerDir = getBrokerName();
672 return new File(getDataDirectoryFile(), brokerDir);
673 }
674
675 /**
676 * Sets the directory in which the data files will be stored by default for
677 * the JDBC and Journal persistence adaptors.
678 *
679 * @param dataDirectory the directory to store data files
680 */
681 public void setDataDirectory(String dataDirectory) {
682 setDataDirectoryFile(new File(dataDirectory));
683 }
684
685 /**
686 * Sets the directory in which the data files will be stored by default for
687 * the JDBC and Journal persistence adaptors.
688 *
689 * @param dataDirectoryFile the directory to store data files
690 */
691 public void setDataDirectoryFile(File dataDirectoryFile) {
692 this.dataDirectoryFile = dataDirectoryFile;
693 }
694
695 /**
696 * @return the tmpDataDirectory
697 */
698 public File getTmpDataDirectory() {
699 if (tmpDataDirectory == null) {
700 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
701 }
702 return tmpDataDirectory;
703 }
704
705 /**
706 * @param tmpDataDirectory the tmpDataDirectory to set
707 */
708 public void setTmpDataDirectory(File tmpDataDirectory) {
709 this.tmpDataDirectory = tmpDataDirectory;
710 }
711
712 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
713 this.persistenceFactory = persistenceFactory;
714 }
715
716 public void setDestinationFactory(DestinationFactory destinationFactory) {
717 this.destinationFactory = destinationFactory;
718 }
719
720 public boolean isPersistent() {
721 return persistent;
722 }
723
724 /**
725 * Sets whether or not persistence is enabled or disabled.
726 */
727 public void setPersistent(boolean persistent) {
728 this.persistent = persistent;
729 }
730
731 public boolean isPopulateJMSXUserID() {
732 return populateJMSXUserID;
733 }
734
735 /**
736 * Sets whether or not the broker should populate the JMSXUserID header.
737 */
738 public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
739 this.populateJMSXUserID = populateJMSXUserID;
740 }
741
742 public SystemUsage getSystemUsage() {
743 try {
744 if (systemUsage == null) {
745 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
746 systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default 64 Meg
747 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10 Gb
748 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
749 addService(this.systemUsage);
750 }
751 return systemUsage;
752 } catch (IOException e) {
753 LOG.fatal("Cannot create SystemUsage", e);
754 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
755 }
756 }
757
758 public void setSystemUsage(SystemUsage memoryManager) {
759 if (this.systemUsage != null) {
760 removeService(this.systemUsage);
761 }
762 this.systemUsage = memoryManager;
763 addService(this.systemUsage);
764 }
765
766 /**
767 * @return the consumerUsageManager
768 * @throws IOException
769 */
770 public SystemUsage getConsumerSystemUsage() throws IOException {
771 if (this.consumerSystemUsaage == null) {
772 if(splitSystemUsageForProducersConsumers) {
773 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
774 float portion = consumerSystemUsagePortion/100f;
775 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
776 addService(this.consumerSystemUsaage);
777 }else {
778 consumerSystemUsaage=getSystemUsage();
779 }
780 }
781 return this.consumerSystemUsaage;
782 }
783
784 /**
785 * @param consumerSystemUsaage the storeSystemUsage to set
786 */
787 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
788 if (this.consumerSystemUsaage != null) {
789 removeService(this.consumerSystemUsaage);
790 }
791 this.consumerSystemUsaage = consumerSystemUsaage;
792 addService(this.consumerSystemUsaage);
793 }
794
795 /**
796 * @return the producerUsageManager
797 * @throws IOException
798 */
799 public SystemUsage getProducerSystemUsage() throws IOException {
800 if (producerSystemUsage == null ) {
801 if (splitSystemUsageForProducersConsumers) {
802 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
803 float portion = producerSystemUsagePortion/100f;
804 producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
805 addService(producerSystemUsage);
806 }else {
807 producerSystemUsage=getSystemUsage();
808 }
809 }
810 return producerSystemUsage;
811 }
812
813 /**
814 * @param producerUsageManager the producerUsageManager to set
815 */
816 public void setProducerSystemUsage(SystemUsage producerUsageManager) {
817 if (this.producerSystemUsage != null) {
818 removeService(this.producerSystemUsage);
819 }
820 this.producerSystemUsage = producerUsageManager;
821 addService(this.producerSystemUsage);
822 }
823
824 public PersistenceAdapter getPersistenceAdapter() throws IOException {
825 if (persistenceAdapter == null) {
826 persistenceAdapter = createPersistenceAdapter();
827 configureService(persistenceAdapter);
828 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
829 }
830 return persistenceAdapter;
831 }
832
833 /**
834 * Sets the persistence adaptor implementation to use for this broker
835 * @throws IOException
836 */
837 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
838 this.persistenceAdapter = persistenceAdapter;
839 configureService(this.persistenceAdapter);
840 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
841
842 }
843
844 public TaskRunnerFactory getTaskRunnerFactory() {
845 if (taskRunnerFactory == null) {
846 taskRunnerFactory = new TaskRunnerFactory("BrokerService",getTaskRunnerPriority(),true,1000,isDedicatedTaskRunner());
847 }
848 return taskRunnerFactory;
849 }
850
851 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
852 this.taskRunnerFactory = taskRunnerFactory;
853 }
854
855 public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
856 if (taskRunnerFactory == null) {
857 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, true, 1000);
858 }
859 return persistenceTaskRunnerFactory;
860 }
861
862 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
863 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
864 }
865
866 public boolean isUseJmx() {
867 return useJmx;
868 }
869
870 public boolean isEnableStatistics() {
871 return enableStatistics;
872 }
873
874 /**
875 * Sets whether or not the Broker's services enable statistics or not.
876 */
877 public void setEnableStatistics(boolean enableStatistics) {
878 this.enableStatistics = enableStatistics;
879 }
880
881 /**
882 * Sets whether or not the Broker's services should be exposed into JMX or
883 * not.
884 */
885 public void setUseJmx(boolean useJmx) {
886 this.useJmx = useJmx;
887 }
888
889 public ObjectName getBrokerObjectName() throws IOException {
890 if (brokerObjectName == null) {
891 brokerObjectName = createBrokerObjectName();
892 }
893 return brokerObjectName;
894 }
895
896 /**
897 * Sets the JMX ObjectName for this broker
898 */
899 public void setBrokerObjectName(ObjectName brokerObjectName) {
900 this.brokerObjectName = brokerObjectName;
901 }
902
903 public ManagementContext getManagementContext() {
904 if (managementContext == null) {
905 managementContext = new ManagementContext();
906 }
907 return managementContext;
908 }
909
910 public void setManagementContext(ManagementContext managementContext) {
911 this.managementContext = managementContext;
912 }
913
914 public NetworkConnector getNetworkConnectorByName(String connectorName) {
915 for(NetworkConnector connector : networkConnectors) {
916 if(connector.getName().equals(connectorName)) {
917 return connector;
918 }
919 }
920 return null;
921 }
922
923 public String[] getNetworkConnectorURIs() {
924 return networkConnectorURIs;
925 }
926
927 public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
928 this.networkConnectorURIs = networkConnectorURIs;
929 }
930
931 public TransportConnector getConnectorByName(String connectorName) {
932 for(TransportConnector connector : transportConnectors) {
933 if(connector.getName().equals(connectorName)) {
934 return connector;
935 }
936 }
937 return null;
938 }
939
940 public String[] getTransportConnectorURIs() {
941 return transportConnectorURIs;
942 }
943
944 public void setTransportConnectorURIs(String[] transportConnectorURIs) {
945 this.transportConnectorURIs = transportConnectorURIs;
946 }
947
948 /**
949 * @return Returns the jmsBridgeConnectors.
950 */
951 public JmsConnector[] getJmsBridgeConnectors() {
952 return jmsBridgeConnectors;
953 }
954
955 /**
956 * @param jmsConnectors The jmsBridgeConnectors to set.
957 */
958 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
959 this.jmsBridgeConnectors = jmsConnectors;
960 }
961
962 public Service[] getServices() {
963 return (Service[]) services.toArray();
964 }
965
966 /**
967 * Sets the services associated with this broker such as a
968 * {@link MasterConnector}
969 */
970 public void setServices(Service[] services) {
971 this.services.clear();
972 if (services != null) {
973 for (int i=0; i < services.length;i++) {
974 this.services.add(services[i]);
975 }
976 }
977 }
978
979 /**
980 * Adds a new service so that it will be started as part of the broker
981 * lifecycle
982 */
983 public void addService(Service service) {
984 services.add(service);
985 }
986
987 public void removeService(Service service) {
988 services.remove(service);
989 }
990
991 public boolean isUseLoggingForShutdownErrors() {
992 return useLoggingForShutdownErrors;
993 }
994
995 /**
996 * Sets whether or not we should use commons-logging when reporting errors
997 * when shutting down the broker
998 */
999 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1000 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1001 }
1002
1003 public boolean isUseShutdownHook() {
1004 return useShutdownHook;
1005 }
1006
1007 /**
1008 * Sets whether or not we should use a shutdown handler to close down the
1009 * broker cleanly if the JVM is terminated. It is recommended you leave this
1010 * enabled.
1011 */
1012 public void setUseShutdownHook(boolean useShutdownHook) {
1013 this.useShutdownHook = useShutdownHook;
1014 }
1015
1016 public boolean isAdvisorySupport() {
1017 return advisorySupport;
1018 }
1019
1020 /**
1021 * Allows the support of advisory messages to be disabled for performance
1022 * reasons.
1023 */
1024 public void setAdvisorySupport(boolean advisorySupport) {
1025 this.advisorySupport = advisorySupport;
1026 }
1027
1028 public List<TransportConnector> getTransportConnectors() {
1029 return new ArrayList<TransportConnector>(transportConnectors);
1030 }
1031
1032 /**
1033 * Sets the transport connectors which this broker will listen on for new
1034 * clients
1035 *
1036 * @org.apache.xbean.Property nestedType="org.apache.activemq.broker.TransportConnector"
1037 */
1038 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1039 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
1040 TransportConnector connector = iter.next();
1041 addConnector(connector);
1042 }
1043 }
1044
1045 public List<NetworkConnector> getNetworkConnectors() {
1046 return new ArrayList<NetworkConnector>(networkConnectors);
1047 }
1048
1049 public List<ProxyConnector> getProxyConnectors() {
1050 return new ArrayList<ProxyConnector>(proxyConnectors);
1051 }
1052
1053 /**
1054 * Sets the network connectors which this broker will use to connect to
1055 * other brokers in a federated network
1056 *
1057 * @org.apache.xbean.Property nestedType="org.apache.activemq.network.NetworkConnector"
1058 */
1059 public void setNetworkConnectors(List networkConnectors) throws Exception {
1060 for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
1061 NetworkConnector connector = (NetworkConnector)iter.next();
1062 addNetworkConnector(connector);
1063 }
1064 }
1065
1066 /**
1067 * Sets the network connectors which this broker will use to connect to
1068 * other brokers in a federated network
1069 */
1070 public void setProxyConnectors(List proxyConnectors) throws Exception {
1071 for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
1072 ProxyConnector connector = (ProxyConnector)iter.next();
1073 addProxyConnector(connector);
1074 }
1075 }
1076
1077 public PolicyMap getDestinationPolicy() {
1078 return destinationPolicy;
1079 }
1080
1081 /**
1082 * Sets the destination specific policies available either for exact
1083 * destinations or for wildcard areas of destinations.
1084 */
1085 public void setDestinationPolicy(PolicyMap policyMap) {
1086 this.destinationPolicy = policyMap;
1087 }
1088
1089 public BrokerPlugin[] getPlugins() {
1090 return plugins;
1091 }
1092
1093 /**
1094 * Sets a number of broker plugins to install such as for security
1095 * authentication or authorization
1096 */
1097 public void setPlugins(BrokerPlugin[] plugins) {
1098 this.plugins = plugins;
1099 }
1100
1101 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1102 return messageAuthorizationPolicy;
1103 }
1104
1105 /**
1106 * Sets the policy used to decide if the current connection is authorized to
1107 * consume a given message
1108 */
1109 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1110 this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1111 }
1112
1113 /**
1114 * Delete all messages from the persistent store
1115 *
1116 * @throws IOException
1117 */
1118 public void deleteAllMessages() throws IOException {
1119 getPersistenceAdapter().deleteAllMessages();
1120 }
1121
1122 public boolean isDeleteAllMessagesOnStartup() {
1123 return deleteAllMessagesOnStartup;
1124 }
1125
1126 /**
1127 * Sets whether or not all messages are deleted on startup - mostly only
1128 * useful for testing.
1129 */
1130 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1131 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1132 }
1133
1134 public URI getVmConnectorURI() {
1135 if (vmConnectorURI == null) {
1136 try {
1137 vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1138 } catch (URISyntaxException e) {
1139 LOG.error("Badly formed URI from " + getBrokerName(), e);
1140 }
1141 }
1142 return vmConnectorURI;
1143 }
1144
1145 public void setVmConnectorURI(URI vmConnectorURI) {
1146 this.vmConnectorURI = vmConnectorURI;
1147 }
1148
1149 /**
1150 * @return Returns the shutdownOnMasterFailure.
1151 */
1152 public boolean isShutdownOnMasterFailure() {
1153 return shutdownOnMasterFailure;
1154 }
1155
1156 /**
1157 * @param shutdownOnMasterFailure The shutdownOnMasterFailure to set.
1158 */
1159 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1160 this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1161 }
1162
1163 public boolean isKeepDurableSubsActive() {
1164 return keepDurableSubsActive;
1165 }
1166
1167 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1168 this.keepDurableSubsActive = keepDurableSubsActive;
1169 }
1170
1171 public boolean isUseVirtualTopics() {
1172 return useVirtualTopics;
1173 }
1174
1175 /**
1176 * Sets whether or not <a
1177 * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1178 * Topics</a> should be supported by default if they have not been
1179 * explicitly configured.
1180 */
1181 public void setUseVirtualTopics(boolean useVirtualTopics) {
1182 this.useVirtualTopics = useVirtualTopics;
1183 }
1184
1185 public DestinationInterceptor[] getDestinationInterceptors() {
1186 return destinationInterceptors;
1187 }
1188
1189 public boolean isUseMirroredQueues() {
1190 return useMirroredQueues;
1191 }
1192
1193 /**
1194 * Sets whether or not <a
1195 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1196 * Queues</a> should be supported by default if they have not been
1197 * explicitly configured.
1198 */
1199 public void setUseMirroredQueues(boolean useMirroredQueues) {
1200 this.useMirroredQueues = useMirroredQueues;
1201 }
1202
1203 /**
1204 * Sets the destination interceptors to use
1205 */
1206 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1207 this.destinationInterceptors = destinationInterceptors;
1208 }
1209
1210 public ActiveMQDestination[] getDestinations() {
1211 return destinations;
1212 }
1213
1214 /**
1215 * Sets the destinations which should be loaded/created on startup
1216 */
1217 public void setDestinations(ActiveMQDestination[] destinations) {
1218 this.destinations = destinations;
1219 }
1220
1221 /**
1222 * @return the tempDataStore
1223 */
1224 public synchronized Store getTempDataStore() {
1225 if (tempDataStore == null) {
1226
1227 if (!isPersistent()) {
1228 return null;
1229 }
1230
1231 boolean result = true;
1232 boolean empty = true;
1233 try {
1234 File directory = getTmpDataDirectory();
1235 if (directory.exists() && directory.isDirectory()) {
1236 File[] files = directory.listFiles();
1237 if (files != null && files.length > 0) {
1238 empty = false;
1239 for (int i = 0; i < files.length; i++) {
1240 File file = files[i];
1241 if (!file.isDirectory()) {
1242 result &= file.delete();
1243 }
1244 }
1245 }
1246 }
1247 if (!empty) {
1248 String str = result ? "Successfully deleted" : "Failed to delete";
1249 LOG.info(str + " temporary storage");
1250 }
1251 tempDataStore = StoreFactory.open(getTmpDataDirectory(), "rw");
1252 } catch (IOException e) {
1253 throw new RuntimeException(e);
1254 }
1255 }
1256 return tempDataStore;
1257 }
1258
1259 /**
1260 * @param tempDataStore the tempDataStore to set
1261 */
1262 public void setTempDataStore(Store tempDataStore) {
1263 this.tempDataStore = tempDataStore;
1264 }
1265
1266 public int getPersistenceThreadPriority() {
1267 return persistenceThreadPriority;
1268 }
1269
1270 public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1271 this.persistenceThreadPriority = persistenceThreadPriority;
1272 }
1273
1274 /**
1275 * @return the useLocalHostBrokerName
1276 */
1277 public boolean isUseLocalHostBrokerName() {
1278 return this.useLocalHostBrokerName;
1279 }
1280
1281 /**
1282 * @param useLocalHostBrokerName the useLocalHostBrokerName to set
1283 */
1284 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1285 this.useLocalHostBrokerName = useLocalHostBrokerName;
1286 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1287 brokerName = LOCAL_HOST_NAME;
1288 }
1289 }
1290
1291 /**
1292 * @return the supportFailOver
1293 */
1294 public boolean isSupportFailOver() {
1295 return this.supportFailOver;
1296 }
1297
1298 /**
1299 * @param supportFailOver the supportFailOver to set
1300 */
1301 public void setSupportFailOver(boolean supportFailOver) {
1302 this.supportFailOver = supportFailOver;
1303 }
1304
1305 /**
1306 * Looks up and lazily creates if necessary the destination for the given JMS name
1307 */
1308 public Destination getDestination(ActiveMQDestination destination) throws Exception {
1309 return getBroker().addDestination(getAdminConnectionContext(), destination);
1310 }
1311
1312 public void removeDestination(ActiveMQDestination destination) throws Exception {
1313 getBroker().removeDestination(getAdminConnectionContext(), destination,0);
1314 }
1315
1316 public int getProducerSystemUsagePortion() {
1317 return producerSystemUsagePortion;
1318 }
1319
1320 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1321 this.producerSystemUsagePortion = producerSystemUsagePortion;
1322 }
1323
1324 public int getConsumerSystemUsagePortion() {
1325 return consumerSystemUsagePortion;
1326 }
1327
1328 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1329 this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1330 }
1331
1332 public boolean isSplitSystemUsageForProducersConsumers() {
1333 return splitSystemUsageForProducersConsumers;
1334 }
1335
1336 public void setSplitSystemUsageForProducersConsumers(
1337 boolean splitSystemUsageForProducersConsumers) {
1338 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1339 }
1340
1341 public boolean isMonitorConnectionSplits() {
1342 return monitorConnectionSplits;
1343 }
1344
1345 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1346 this.monitorConnectionSplits = monitorConnectionSplits;
1347 }
1348 public int getTaskRunnerPriority() {
1349 return taskRunnerPriority;
1350 }
1351
1352 public void setTaskRunnerPriority(int taskRunnerPriority) {
1353 this.taskRunnerPriority = taskRunnerPriority;
1354 }
1355
1356 public boolean isDedicatedTaskRunner() {
1357 return dedicatedTaskRunner;
1358 }
1359
1360 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1361 this.dedicatedTaskRunner = dedicatedTaskRunner;
1362 }
1363
1364 public boolean isCacheTempDestinations() {
1365 return cacheTempDestinations;
1366 }
1367
1368 public void setCacheTempDestinations(boolean cacheTempDestinations) {
1369 this.cacheTempDestinations = cacheTempDestinations;
1370 }
1371
1372 public int getTimeBeforePurgeTempDestinations() {
1373 return timeBeforePurgeTempDestinations;
1374 }
1375
1376 public void setTimeBeforePurgeTempDestinations(
1377 int timeBeforePurgeTempDestinations) {
1378 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1379 }
1380
1381 public boolean isUseTempMirroredQueues() {
1382 return useTempMirroredQueues;
1383 }
1384
1385 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1386 this.useTempMirroredQueues = useTempMirroredQueues;
1387 }
1388 //
1389 // Implementation methods
1390 // -------------------------------------------------------------------------
1391 /**
1392 * Handles any lazy-creation helper properties which are added to make
1393 * things easier to configure inside environments such as Spring
1394 *
1395 * @throws Exception
1396 */
1397 protected void processHelperProperties() throws Exception {
1398 boolean masterServiceExists = false;
1399 if (transportConnectorURIs != null) {
1400 for (int i = 0; i < transportConnectorURIs.length; i++) {
1401 String uri = transportConnectorURIs[i];
1402 addConnector(uri);
1403 }
1404 }
1405 if (networkConnectorURIs != null) {
1406 for (int i = 0; i < networkConnectorURIs.length; i++) {
1407 String uri = networkConnectorURIs[i];
1408 addNetworkConnector(uri);
1409 }
1410 }
1411
1412 if (jmsBridgeConnectors != null) {
1413 for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1414 addJmsConnector(jmsBridgeConnectors[i]);
1415 }
1416 }
1417 for (Service service : services) {
1418 if (service instanceof MasterConnector) {
1419 masterServiceExists = true;
1420 break;
1421 }
1422 }
1423 if (masterConnectorURI != null) {
1424 if (masterServiceExists) {
1425 throw new IllegalStateException("Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1426 } else {
1427 addService(new MasterConnector(masterConnectorURI));
1428 }
1429 }
1430 }
1431
1432 protected void stopAllConnectors(ServiceStopper stopper) {
1433
1434 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1435 NetworkConnector connector = iter.next();
1436 unregisterNetworkConnectorMBean(connector);
1437 stopper.stop(connector);
1438 }
1439
1440 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1441 ProxyConnector connector = iter.next();
1442 stopper.stop(connector);
1443 }
1444
1445 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1446 JmsConnector connector = iter.next();
1447 stopper.stop(connector);
1448 }
1449
1450 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1451 TransportConnector connector = iter.next();
1452 stopper.stop(connector);
1453 }
1454 }
1455
1456 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1457 MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1458 if (mbeanServer != null) {
1459
1460 try {
1461 ObjectName objectName = createConnectorObjectName(connector);
1462 connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), objectName);
1463 ConnectorViewMBean view = new ConnectorView(connector);
1464 mbeanServer.registerMBean(view, objectName);
1465 registeredMBeanNames.add(objectName);
1466 return connector;
1467 } catch (Throwable e) {
1468 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1469 }
1470 }
1471 return connector;
1472 }
1473
1474 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1475 if (isUseJmx()) {
1476 MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1477 if (mbeanServer != null) {
1478 try {
1479 ObjectName objectName = createConnectorObjectName(connector);
1480
1481 if (registeredMBeanNames.remove(objectName)) {
1482 mbeanServer.unregisterMBean(objectName);
1483 }
1484 } catch (Throwable e) {
1485 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1486 }
1487 }
1488 }
1489 }
1490
1491 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1492 // MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1493 // if (mbeanServer != null) {
1494 //
1495 //
1496 // }
1497 return adaptor;
1498 }
1499
1500 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1501 if (isUseJmx()) {
1502 MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1503 if (mbeanServer != null) {
1504
1505 }
1506 }
1507 }
1508
1509 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
1510 return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector,"
1511 + "ConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1512 }
1513
1514 protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
1515 MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1516 if (mbeanServer != null) {
1517 NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
1518 try {
1519 ObjectName objectName = createNetworkConnectorObjectName(connector);
1520 connector.setObjectName(objectName);
1521 mbeanServer.registerMBean(view, objectName);
1522 registeredMBeanNames.add(objectName);
1523 } catch (Throwable e) {
1524 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
1525 }
1526 }
1527 }
1528
1529 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
1530 return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1531 + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1532 }
1533
1534 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
1535 if (isUseJmx()) {
1536 MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1537 if (mbeanServer != null) {
1538 try {
1539 ObjectName objectName = createNetworkConnectorObjectName(connector);
1540 if (registeredMBeanNames.remove(objectName)) {
1541 mbeanServer.unregisterMBean(objectName);
1542 }
1543 } catch (Exception e) {
1544 LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
1545 }
1546 }
1547 }
1548 }
1549
1550 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
1551 MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1552 if (mbeanServer != null) {
1553 ProxyConnectorView view = new ProxyConnectorView(connector);
1554 try {
1555 ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
1556 + "Type=ProxyConnector," + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1557 mbeanServer.registerMBean(view, objectName);
1558 registeredMBeanNames.add(objectName);
1559 } catch (Throwable e) {
1560 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1561 }
1562 }
1563 }
1564
1565 protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
1566 MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1567 if (mbeanServer != null) {
1568 FTConnectorView view = new FTConnectorView(connector);
1569 try {
1570 ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
1571 + "Type=MasterConnector");
1572 mbeanServer.registerMBean(view, objectName);
1573 registeredMBeanNames.add(objectName);
1574 } catch (Throwable e) {
1575 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1576 }
1577 }
1578 }
1579
1580 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
1581 MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1582 if (mbeanServer != null) {
1583 JmsConnectorView view = new JmsConnectorView(connector);
1584 try {
1585 ObjectName objectName = new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
1586 + "Type=JmsConnector," + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1587 mbeanServer.registerMBean(view, objectName);
1588 registeredMBeanNames.add(objectName);
1589 } catch (Throwable e) {
1590 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1591 }
1592 }
1593 }
1594
1595 /**
1596 * Factory method to create a new broker
1597 *
1598 * @throws Exception
1599 * @throws
1600 * @throws
1601 */
1602 protected Broker createBroker() throws Exception {
1603 regionBroker = createRegionBroker();
1604 Broker broker = addInterceptors(regionBroker);
1605
1606 // Add a filter that will stop access to the broker once stopped
1607 broker = new MutableBrokerFilter(broker) {
1608 Broker old;
1609
1610 public void stop() throws Exception {
1611 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
1612 // Just ignore additional stop actions.
1613 public void stop() throws Exception {
1614 }
1615
1616 });
1617 old.stop();
1618 }
1619
1620 public void start() throws Exception {
1621 if (forceStart && old != null) {
1622 this.next.set(old);
1623 }
1624 getNext().start();
1625 }
1626
1627 };
1628
1629 return broker;
1630
1631 }
1632
1633 /**
1634 * Factory method to create the core region broker onto which interceptors
1635 * are added
1636 *
1637 * @throws Exception
1638 */
1639 protected Broker createRegionBroker() throws Exception {
1640 if (destinationInterceptors == null) {
1641 destinationInterceptors = createDefaultDestinationInterceptor();
1642 }
1643 configureServices(destinationInterceptors);
1644
1645 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
1646 if (destinationFactory == null) {
1647 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
1648 }
1649 return createRegionBroker(destinationInterceptor);
1650 }
1651
1652 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
1653 RegionBroker regionBroker;
1654 if (isUseJmx()) {
1655 MBeanServer mbeanServer = getManagementContext().getMBeanServer();
1656 regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
1657 destinationInterceptor);
1658 } else {
1659 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
1660 }
1661 destinationFactory.setRegionBroker(regionBroker);
1662
1663 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
1664 regionBroker.setBrokerName(getBrokerName());
1665 regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
1666
1667 return regionBroker;
1668 }
1669
1670 /**
1671 * Create the default destination interceptor
1672 */
1673 protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
1674 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
1675 if (isUseVirtualTopics()) {
1676 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
1677 VirtualTopic virtualTopic = new VirtualTopic();
1678 virtualTopic.setName("VirtualTopic.>");
1679 VirtualDestination[] virtualDestinations = {virtualTopic};
1680 interceptor.setVirtualDestinations(virtualDestinations);
1681 answer.add(interceptor);
1682 }
1683 if (isUseMirroredQueues()) {
1684 MirroredQueue interceptor = new MirroredQueue();
1685 answer.add(interceptor);
1686 }
1687 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
1688 answer.toArray(array);
1689 return array;
1690 }
1691
1692 /**
1693 * Strategy method to add interceptors to the broker
1694 *
1695 * @throws IOException
1696 */
1697 protected Broker addInterceptors(Broker broker) throws Exception {
1698 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
1699 if (isAdvisorySupport()) {
1700 broker = new AdvisoryBroker(broker);
1701 }
1702 broker = new CompositeDestinationBroker(broker);
1703 if (isPopulateJMSXUserID()) {
1704 broker = new UserIDBroker(broker);
1705 }
1706 if (isMonitorConnectionSplits()){
1707 broker = new ConnectionSplitBroker(broker);
1708 }
1709 if (plugins != null) {
1710 for (int i = 0; i < plugins.length; i++) {
1711 BrokerPlugin plugin = plugins[i];
1712 broker = plugin.installPlugin(broker);
1713 }
1714 }
1715 return broker;
1716 }
1717
1718 protected PersistenceAdapter createPersistenceAdapter() throws IOException {
1719 if (isPersistent()) {
1720 return getPersistenceFactory().createPersistenceAdapter();
1721 } else {
1722 return new MemoryPersistenceAdapter();
1723 }
1724 }
1725
1726 protected AMQPersistenceAdapterFactory createPersistenceFactory() {
1727 AMQPersistenceAdapterFactory factory = new AMQPersistenceAdapterFactory();
1728 factory.setDataDirectory(getBrokerDataDirectory());
1729 factory.setTaskRunnerFactory(getPersistenceTaskRunnerFactory());
1730 factory.setBrokerName(getBrokerName());
1731 return factory;
1732 }
1733
1734 protected ObjectName createBrokerObjectName() throws IOException {
1735 try {
1736 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
1737 } catch (Throwable e) {
1738 throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
1739 }
1740 }
1741
1742 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
1743 TransportServer transport = TransportFactory.bind(this, brokerURI);
1744 return new TransportConnector(transport);
1745 }
1746
1747 /**
1748 * Extracts the port from the options
1749 */
1750 protected Object getPort(Map options) {
1751 Object port = options.get("port");
1752 if (port == null) {
1753 port = DEFAULT_PORT;
1754 LOG.warn("No port specified so defaulting to: " + port);
1755 }
1756 return port;
1757 }
1758
1759 protected void addShutdownHook() {
1760 if (useShutdownHook) {
1761 shutdownHook = new Thread("ActiveMQ ShutdownHook") {
1762 public void run() {
1763 containerShutdown();
1764 }
1765 };
1766 Runtime.getRuntime().addShutdownHook(shutdownHook);
1767 }
1768 }
1769
1770 protected void removeShutdownHook() {
1771 if (shutdownHook != null) {
1772 try {
1773 Runtime.getRuntime().removeShutdownHook(shutdownHook);
1774 } catch (Exception e) {
1775 LOG.debug("Caught exception, must be shutting down: " + e);
1776 }
1777 }
1778 }
1779
1780 /**
1781 * Causes a clean shutdown of the container when the VM is being shut down
1782 */
1783 protected void containerShutdown() {
1784 try {
1785 stop();
1786 } catch (IOException e) {
1787 Throwable linkedException = e.getCause();
1788 if (linkedException != null) {
1789 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
1790 } else {
1791 logError("Failed to shut down: " + e, e);
1792 }
1793 if (!useLoggingForShutdownErrors) {
1794 e.printStackTrace(System.err);
1795 }
1796 } catch (Exception e) {
1797 logError("Failed to shut down: " + e, e);
1798 }
1799 }
1800
1801 protected void logError(String message, Throwable e) {
1802 if (useLoggingForShutdownErrors) {
1803 LOG.error("Failed to shut down: " + e);
1804 } else {
1805 System.err.println("Failed to shut down: " + e);
1806 }
1807 }
1808
1809 /**
1810 * Starts any configured destinations on startup
1811 */
1812 protected void startDestinations() throws Exception {
1813 if (destinations != null) {
1814 ConnectionContext adminConnectionContext = getAdminConnectionContext();
1815
1816 for (int i = 0; i < destinations.length; i++) {
1817 ActiveMQDestination destination = destinations[i];
1818 getBroker().addDestination(adminConnectionContext, destination);
1819 }
1820 }
1821 }
1822
1823 /**
1824 * Returns the broker's administration connection context used for
1825 * configuring the broker at startup
1826 */
1827 public ConnectionContext getAdminConnectionContext() throws Exception {
1828 ConnectionContext adminConnectionContext = getBroker().getAdminConnectionContext();
1829 if (adminConnectionContext == null) {
1830 adminConnectionContext = createAdminConnectionContext();
1831 getBroker().setAdminConnectionContext(adminConnectionContext);
1832 }
1833 return adminConnectionContext;
1834 }
1835
1836 /**
1837 * Factory method to create the new administration connection context
1838 * object. Note this method is here rather than inside a default broker
1839 * implementation to ensure that the broker reference inside it is the outer
1840 * most interceptor
1841 */
1842 protected ConnectionContext createAdminConnectionContext() throws Exception {
1843 ConnectionContext context = new ConnectionContext();
1844 context.setBroker(getBroker());
1845 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
1846 return context;
1847 }
1848
1849 protected void waitForSlave(){
1850 try {
1851 slaveStartSignal.await();
1852 }catch(InterruptedException e){
1853 LOG.error("Exception waiting for slave:"+e);
1854 }
1855 }
1856
1857 protected void slaveConnectionEstablished(){
1858 slaveStartSignal.countDown();
1859 }
1860
1861
1862 /**
1863 * Start all transport and network connections, proxies and bridges
1864 *
1865 * @throws Exception
1866 */
1867 protected void startAllConnectors() throws Exception {
1868 if (!isSlave()) {
1869 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
1870 List<TransportConnector> al = new ArrayList<TransportConnector>();
1871
1872 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1873 TransportConnector connector = iter.next();
1874 connector.setBrokerService(this);
1875 al.add(startTransportConnector(connector));
1876 }
1877
1878 if (al.size() > 0) {
1879 // let's clear the transportConnectors list and replace it with
1880 // the started transportConnector instances
1881 this.transportConnectors.clear();
1882 setTransportConnectors(al);
1883 }
1884 URI uri = getVmConnectorURI();
1885 Map<String, String> map = new HashMap<String, String>(URISupport.parseParamters(uri));
1886 map.put("network", "true");
1887 map.put("async", "false");
1888 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1889 if(isWaitForSlave()){
1890 waitForSlave();
1891 }
1892 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1893 NetworkConnector connector = iter.next();
1894 connector.setLocalUri(uri);
1895 connector.setBrokerName(getBrokerName());
1896 connector.setDurableDestinations(durableDestinations);
1897 connector.start();
1898 }
1899
1900 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1901 ProxyConnector connector = iter.next();
1902 connector.start();
1903 }
1904
1905 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1906 JmsConnector connector = iter.next();
1907 connector.start();
1908 }
1909 for (Service service:services) {
1910 configureService(service);
1911 service.start();
1912 }
1913 }
1914 }
1915
1916 protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
1917 connector.setTaskRunnerFactory(getTaskRunnerFactory());
1918 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
1919 if (policy != null) {
1920 connector.setMessageAuthorizationPolicy(policy);
1921 }
1922
1923 if (isUseJmx()) {
1924 connector = registerConnectorMBean(connector);
1925 }
1926
1927 connector.getStatistics().setEnabled(enableStatistics);
1928
1929 connector.start();
1930
1931 return connector;
1932 }
1933
1934 /**
1935 * Perform any custom dependency injection
1936 */
1937 protected void configureServices(Object[] services) {
1938 for (Object service : services) {
1939 configureService(service);
1940 }
1941 }
1942
1943 /**
1944 * Perform any custom dependency injection
1945 */
1946 protected void configureService(Object service) {
1947 if (service instanceof BrokerServiceAware) {
1948 BrokerServiceAware serviceAware = (BrokerServiceAware) service;
1949 serviceAware.setBrokerService(this);
1950 }
1951 if (masterConnector == null) {
1952 if (service instanceof MasterConnector) {
1953 masterConnector = (MasterConnector) service;
1954 supportFailOver = true;
1955 }
1956 }
1957 }
1958
1959 /**
1960 * Starts all destiantions in persistence store. This includes all inactive
1961 * destinations
1962 */
1963 protected void startDestinationsInPersistenceStore(Broker broker) throws Exception {
1964 Set destinations = destinationFactory.getDestinations();
1965 if (destinations != null) {
1966 Iterator iter = destinations.iterator();
1967
1968 ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
1969 if (adminConnectionContext == null) {
1970 ConnectionContext context = new ConnectionContext();
1971 context.setBroker(broker);
1972 adminConnectionContext = context;
1973 broker.setAdminConnectionContext(adminConnectionContext);
1974 }
1975
1976 while (iter.hasNext()) {
1977 ActiveMQDestination destination = (ActiveMQDestination)iter.next();
1978 broker.addDestination(adminConnectionContext, destination);
1979 }
1980 }
1981 }
1982
1983 public Broker getRegionBroker() {
1984 return regionBroker;
1985 }
1986
1987 public void setRegionBroker(Broker regionBroker) {
1988 this.regionBroker = regionBroker;
1989 }
1990
1991
1992 public void addShutdownHook(Runnable hook) {
1993 synchronized(shutdownHooks) {
1994 shutdownHooks.add(hook);
1995 }
1996 }
1997
1998 public void removeShutdownHook(Runnable hook) {
1999 synchronized(shutdownHooks) {
2000 shutdownHooks.remove(hook);
2001 }
2002 }
2003
2004 public boolean isSystemExitOnShutdown() {
2005 return systemExitOnShutdown;
2006 }
2007
2008 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2009 this.systemExitOnShutdown = systemExitOnShutdown;
2010 }
2011
2012 public int getSystemExitOnShutdownExitCode() {
2013 return systemExitOnShutdownExitCode;
2014 }
2015
2016 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2017 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2018 }
2019
2020 public SslContext getSslContext() {
2021 return sslContext;
2022 }
2023
2024 public void setSslContext(SslContext sslContext) {
2025 this.sslContext = sslContext;
2026 }
2027
2028 public boolean isShutdownOnSlaveFailure() {
2029 return shutdownOnSlaveFailure;
2030 }
2031
2032 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2033 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2034 }
2035
2036 public boolean isWaitForSlave() {
2037 return waitForSlave;
2038 }
2039
2040 public void setWaitForSlave(boolean waitForSlave) {
2041 this.waitForSlave = waitForSlave;
2042 }
2043
2044 public CountDownLatch getSlaveStartSignal() {
2045 return slaveStartSignal;
2046 }
2047
2048 }