001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.camel.loanbroker.queue.version;
019    
020    import java.io.File;
021    
022    import org.apache.activemq.broker.BrokerService;
023    import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
024    
025    public final class JmsBroker {
026        JMSEmbeddedBroker jmsBrokerThread;
027        String jmsBrokerUrl = "tcp://localhost:51616";
028        String activeMQStorageDir;
029        public JmsBroker() {
030        }
031    
032        public JmsBroker(String url) {
033            jmsBrokerUrl = url;
034        }
035        
036        public void start() throws Exception {
037            jmsBrokerThread = new JMSEmbeddedBroker(jmsBrokerUrl);
038            jmsBrokerThread.startBroker();
039        }
040        
041        public void stop() throws Exception {
042            synchronized (this) {
043                jmsBrokerThread.shutdownBroker = true;
044            }
045            if (jmsBrokerThread != null) {
046                jmsBrokerThread.join();
047            }
048            
049            jmsBrokerThread = null;
050        }
051        
052        class JMSEmbeddedBroker extends Thread {
053            boolean shutdownBroker;
054            final String brokerUrl;
055            Exception exception;
056            
057            
058            public JMSEmbeddedBroker(String url) {
059                brokerUrl = url;
060            }
061            
062            public void startBroker() throws Exception {
063                synchronized (this) {
064                    super.start();
065                    try {
066                        wait();
067                        if (exception != null) {
068                            throw exception;
069                        }
070                    } catch (InterruptedException ex) {
071                        ex.printStackTrace();
072                    }
073                }
074            }
075            
076            public void run() {
077                try {  
078                    //ContainerWapper container;
079                    BrokerService broker = new BrokerService();
080                    synchronized (this) {                                     
081                        broker.setPersistenceAdapter(new MemoryPersistenceAdapter());                    
082                        broker.setTmpDataDirectory(new File("./target"));
083                        broker.addConnector(brokerUrl);
084                        broker.start();
085                        Thread.sleep(200);
086                        notifyAll();
087                    }
088                    synchronized (this) {
089                        while (!shutdownBroker) {
090                            wait(1000);
091                        }
092                    }                
093                    broker.stop();              
094                    broker = null;                
095                } catch (Exception e) {
096                    exception = e;
097                    e.printStackTrace();
098                }
099            }
100        }
101    }
102