001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.Map;
022    import java.util.Map.Entry;
023    import java.util.concurrent.ConcurrentHashMap;
024    
025    import javax.transaction.xa.XAException;
026    
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.command.Message;
029    import org.apache.activemq.command.MessageAck;
030    import org.apache.activemq.command.TransactionId;
031    import org.apache.activemq.command.XATransactionId;
032    import org.apache.activemq.store.MessageStore;
033    import org.apache.activemq.store.ProxyMessageStore;
034    import org.apache.activemq.store.ProxyTopicMessageStore;
035    import org.apache.activemq.store.TopicMessageStore;
036    import org.apache.activemq.store.TransactionRecoveryListener;
037    import org.apache.activemq.store.TransactionStore;
038    
039    /**
040     * Provides a TransactionStore implementation that can create transaction aware
041     * MessageStore objects from non transaction aware MessageStore objects.
042     * 
043     * @version $Revision: 1.4 $
044     */
045    public class KahaTransactionStore implements TransactionStore {
046        private Map transactions = new ConcurrentHashMap();
047        private Map prepared;
048        private KahaPersistenceAdapter adaptor;
049    
050        KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
051            this.adaptor = adaptor;
052            this.prepared = preparedMap;
053        }
054    
055        public MessageStore proxy(MessageStore messageStore) {
056            return new ProxyMessageStore(messageStore) {
057                public void addMessage(ConnectionContext context, final Message send) throws IOException {
058                    KahaTransactionStore.this.addMessage(getDelegate(), send);
059                }
060    
061                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
062                    KahaTransactionStore.this.removeMessage(getDelegate(), ack);
063                }
064            };
065        }
066    
067        public TopicMessageStore proxy(TopicMessageStore messageStore) {
068            return new ProxyTopicMessageStore(messageStore) {
069                public void addMessage(ConnectionContext context, final Message send) throws IOException {
070                    KahaTransactionStore.this.addMessage(getDelegate(), send);
071                }
072    
073                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
074                    KahaTransactionStore.this.removeMessage(getDelegate(), ack);
075                }
076            };
077        }
078    
079        /**
080         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
081         */
082        public void prepare(TransactionId txid) {
083            KahaTransaction tx = getTx(txid);
084            if (tx != null) {
085                tx.prepare();
086                prepared.put(txid, tx);
087            }
088        }
089    
090        /**
091         * @throws XAException
092         * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
093         */
094        public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
095            KahaTransaction tx = getTx(txid);
096            if (tx != null) {
097                tx.commit(this);
098                removeTx(txid);
099            }
100        }
101    
102        /**
103         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
104         */
105        public void rollback(TransactionId txid) {
106            KahaTransaction tx = getTx(txid);
107            if (tx != null) {
108                tx.rollback();
109                removeTx(txid);
110            }
111        }
112    
113        public void start() throws Exception {
114        }
115    
116        public void stop() throws Exception {
117        }
118    
119        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
120            for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
121                Map.Entry entry = (Entry)i.next();
122                XATransactionId xid = (XATransactionId)entry.getKey();
123                KahaTransaction kt = (KahaTransaction)entry.getValue();
124                listener.recover(xid, kt.getMessages(), kt.getAcks());
125            }
126        }
127    
128        /**
129         * @param message
130         * @throws IOException
131         */
132        void addMessage(final MessageStore destination, final Message message) throws IOException {
133            if (message.isInTransaction()) {
134                KahaTransaction tx = getOrCreateTx(message.getTransactionId());
135                tx.add((KahaMessageStore)destination, message);
136            } else {
137                destination.addMessage(null, message);
138            }
139        }
140    
141        /**
142         * @param ack
143         * @throws IOException
144         */
145        final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
146            if (ack.isInTransaction()) {
147                KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
148                tx.add((KahaMessageStore)destination, ack);
149            } else {
150                destination.removeMessage(null, ack);
151            }
152        }
153    
154        protected synchronized KahaTransaction getTx(TransactionId key) {
155            KahaTransaction result = (KahaTransaction)transactions.get(key);
156            if (result == null) {
157                result = (KahaTransaction)prepared.get(key);
158            }
159            return result;
160        }
161    
162        protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
163            KahaTransaction result = (KahaTransaction)transactions.get(key);
164            if (result == null) {
165                result = new KahaTransaction();
166                transactions.put(key, result);
167            }
168            return result;
169        }
170    
171        protected synchronized void removeTx(TransactionId key) {
172            transactions.remove(key);
173            prepared.remove(key);
174        }
175    
176        public void delete() {
177            transactions.clear();
178            prepared.clear();
179        }
180    
181        protected MessageStore getStoreById(Object id) {
182            return adaptor.retrieveMessageStore(id);
183        }
184    }