001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadaptor;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.Map;
022 import java.util.Map.Entry;
023 import java.util.concurrent.ConcurrentHashMap;
024
025 import javax.transaction.xa.XAException;
026
027 import org.apache.activemq.broker.ConnectionContext;
028 import org.apache.activemq.command.Message;
029 import org.apache.activemq.command.MessageAck;
030 import org.apache.activemq.command.TransactionId;
031 import org.apache.activemq.command.XATransactionId;
032 import org.apache.activemq.store.MessageStore;
033 import org.apache.activemq.store.ProxyMessageStore;
034 import org.apache.activemq.store.ProxyTopicMessageStore;
035 import org.apache.activemq.store.TopicMessageStore;
036 import org.apache.activemq.store.TransactionRecoveryListener;
037 import org.apache.activemq.store.TransactionStore;
038
039 /**
040 * Provides a TransactionStore implementation that can create transaction aware
041 * MessageStore objects from non transaction aware MessageStore objects.
042 *
043 * @version $Revision: 1.4 $
044 */
045 public class KahaTransactionStore implements TransactionStore {
046 private Map transactions = new ConcurrentHashMap();
047 private Map prepared;
048 private KahaPersistenceAdapter adaptor;
049
050 KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
051 this.adaptor = adaptor;
052 this.prepared = preparedMap;
053 }
054
055 public MessageStore proxy(MessageStore messageStore) {
056 return new ProxyMessageStore(messageStore) {
057 public void addMessage(ConnectionContext context, final Message send) throws IOException {
058 KahaTransactionStore.this.addMessage(getDelegate(), send);
059 }
060
061 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
062 KahaTransactionStore.this.removeMessage(getDelegate(), ack);
063 }
064 };
065 }
066
067 public TopicMessageStore proxy(TopicMessageStore messageStore) {
068 return new ProxyTopicMessageStore(messageStore) {
069 public void addMessage(ConnectionContext context, final Message send) throws IOException {
070 KahaTransactionStore.this.addMessage(getDelegate(), send);
071 }
072
073 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
074 KahaTransactionStore.this.removeMessage(getDelegate(), ack);
075 }
076 };
077 }
078
079 /**
080 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
081 */
082 public void prepare(TransactionId txid) {
083 KahaTransaction tx = getTx(txid);
084 if (tx != null) {
085 tx.prepare();
086 prepared.put(txid, tx);
087 }
088 }
089
090 /**
091 * @throws XAException
092 * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
093 */
094 public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
095 KahaTransaction tx = getTx(txid);
096 if (tx != null) {
097 tx.commit(this);
098 removeTx(txid);
099 }
100 }
101
102 /**
103 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
104 */
105 public void rollback(TransactionId txid) {
106 KahaTransaction tx = getTx(txid);
107 if (tx != null) {
108 tx.rollback();
109 removeTx(txid);
110 }
111 }
112
113 public void start() throws Exception {
114 }
115
116 public void stop() throws Exception {
117 }
118
119 public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
120 for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
121 Map.Entry entry = (Entry)i.next();
122 XATransactionId xid = (XATransactionId)entry.getKey();
123 KahaTransaction kt = (KahaTransaction)entry.getValue();
124 listener.recover(xid, kt.getMessages(), kt.getAcks());
125 }
126 }
127
128 /**
129 * @param message
130 * @throws IOException
131 */
132 void addMessage(final MessageStore destination, final Message message) throws IOException {
133 if (message.isInTransaction()) {
134 KahaTransaction tx = getOrCreateTx(message.getTransactionId());
135 tx.add((KahaMessageStore)destination, message);
136 } else {
137 destination.addMessage(null, message);
138 }
139 }
140
141 /**
142 * @param ack
143 * @throws IOException
144 */
145 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
146 if (ack.isInTransaction()) {
147 KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
148 tx.add((KahaMessageStore)destination, ack);
149 } else {
150 destination.removeMessage(null, ack);
151 }
152 }
153
154 protected synchronized KahaTransaction getTx(TransactionId key) {
155 KahaTransaction result = (KahaTransaction)transactions.get(key);
156 if (result == null) {
157 result = (KahaTransaction)prepared.get(key);
158 }
159 return result;
160 }
161
162 protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
163 KahaTransaction result = (KahaTransaction)transactions.get(key);
164 if (result == null) {
165 result = new KahaTransaction();
166 transactions.put(key, result);
167 }
168 return result;
169 }
170
171 protected synchronized void removeTx(TransactionId key) {
172 transactions.remove(key);
173 prepared.remove(key);
174 }
175
176 public void delete() {
177 transactions.clear();
178 prepared.clear();
179 }
180
181 protected MessageStore getStoreById(Object id) {
182 return adaptor.retrieveMessageStore(id);
183 }
184 }