001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.store.journal;
019    
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.Iterator;
023    import java.util.LinkedHashMap;
024    import java.util.Map;
025    
026    import javax.transaction.xa.XAException;
027    
028    import org.apache.activeio.journal.RecordLocation;
029    import org.apache.activemq.command.JournalTopicAck;
030    import org.apache.activemq.command.JournalTransaction;
031    import org.apache.activemq.command.Message;
032    import org.apache.activemq.command.MessageAck;
033    import org.apache.activemq.command.TransactionId;
034    import org.apache.activemq.command.XATransactionId;
035    import org.apache.activemq.store.TransactionRecoveryListener;
036    import org.apache.activemq.store.TransactionStore;
037    
038    /**
039     */
040    public class JournalTransactionStore implements TransactionStore {
041    
042        private final JournalPersistenceAdapter peristenceAdapter;
043        private Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
044        private Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
045        private boolean doingRecover;
046    
047        public static class TxOperation {
048    
049            static final byte ADD_OPERATION_TYPE = 0;
050            static final byte REMOVE_OPERATION_TYPE = 1;
051            static final byte ACK_OPERATION_TYPE = 3;
052    
053            public byte operationType;
054            public JournalMessageStore store;
055            public Object data;
056    
057            public TxOperation(byte operationType, JournalMessageStore store, Object data) {
058                this.operationType = operationType;
059                this.store = store;
060                this.data = data;
061            }
062    
063        }
064    
065        /**
066         * Operations
067         * 
068         * @version $Revision: 1.6 $
069         */
070        public static class Tx {
071    
072            private final RecordLocation location;
073            private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
074    
075            public Tx(RecordLocation location) {
076                this.location = location;
077            }
078    
079            public void add(JournalMessageStore store, Message msg) {
080                operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
081            }
082    
083            public void add(JournalMessageStore store, MessageAck ack) {
084                operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
085            }
086    
087            public void add(JournalTopicMessageStore store, JournalTopicAck ack) {
088                operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
089            }
090    
091            public Message[] getMessages() {
092                ArrayList<Object> list = new ArrayList<Object>();
093                for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
094                    TxOperation op = iter.next();
095                    if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
096                        list.add(op.data);
097                    }
098                }
099                Message rc[] = new Message[list.size()];
100                list.toArray(rc);
101                return rc;
102            }
103    
104            public MessageAck[] getAcks() {
105                ArrayList<Object> list = new ArrayList<Object>();
106                for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
107                    TxOperation op = iter.next();
108                    if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
109                        list.add(op.data);
110                    }
111                }
112                MessageAck rc[] = new MessageAck[list.size()];
113                list.toArray(rc);
114                return rc;
115            }
116    
117            public ArrayList<TxOperation> getOperations() {
118                return operations;
119            }
120    
121        }
122    
123        public JournalTransactionStore(JournalPersistenceAdapter adapter) {
124            this.peristenceAdapter = adapter;
125        }
126    
127        /**
128         * @throws IOException
129         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
130         */
131        public void prepare(TransactionId txid) throws IOException {
132            Tx tx = null;
133            synchronized (inflightTransactions) {
134                tx = inflightTransactions.remove(txid);
135            }
136            if (tx == null) {
137                return;
138            }
139            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false),
140                                           true);
141            synchronized (preparedTransactions) {
142                preparedTransactions.put(txid, tx);
143            }
144        }
145    
146        /**
147         * @throws IOException
148         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
149         */
150        public void replayPrepare(TransactionId txid) throws IOException {
151            Tx tx = null;
152            synchronized (inflightTransactions) {
153                tx = inflightTransactions.remove(txid);
154            }
155            if (tx == null) {
156                return;
157            }
158            synchronized (preparedTransactions) {
159                preparedTransactions.put(txid, tx);
160            }
161        }
162    
163        public Tx getTx(Object txid, RecordLocation location) {
164            Tx tx = null;
165            synchronized (inflightTransactions) {
166                tx = inflightTransactions.get(txid);
167            }
168            if (tx == null) {
169                tx = new Tx(location);
170                inflightTransactions.put(txid, tx);
171            }
172            return tx;
173        }
174    
175        /**
176         * @throws XAException
177         * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
178         */
179        public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
180            Tx tx;
181            if (wasPrepared) {
182                synchronized (preparedTransactions) {
183                    tx = preparedTransactions.remove(txid);
184                }
185            } else {
186                synchronized (inflightTransactions) {
187                    tx = inflightTransactions.remove(txid);
188                }
189            }
190            if (tx == null) {
191                return;
192            }
193            if (txid.isXATransaction()) {
194                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid,
195                                                                      wasPrepared), true);
196            } else {
197                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
198                                                                      wasPrepared), true);
199            }
200        }
201    
202        /**
203         * @throws XAException
204         * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
205         */
206        public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
207            if (wasPrepared) {
208                synchronized (preparedTransactions) {
209                    return preparedTransactions.remove(txid);
210                }
211            } else {
212                synchronized (inflightTransactions) {
213                    return inflightTransactions.remove(txid);
214                }
215            }
216        }
217    
218        /**
219         * @throws IOException
220         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
221         */
222        public void rollback(TransactionId txid) throws IOException {
223            Tx tx = null;
224            synchronized (inflightTransactions) {
225                tx = inflightTransactions.remove(txid);
226            }
227            if (tx != null) {
228                synchronized (preparedTransactions) {
229                    tx = preparedTransactions.remove(txid);
230                }
231            }
232            if (tx != null) {
233                if (txid.isXATransaction()) {
234                    peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid,
235                                                                          false), true);
236                } else {
237                    peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,
238                                                                          txid, false), true);
239                }
240            }
241        }
242    
243        /**
244         * @throws IOException
245         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
246         */
247        public void replayRollback(TransactionId txid) throws IOException {
248            boolean inflight = false;
249            synchronized (inflightTransactions) {
250                inflight = inflightTransactions.remove(txid) != null;
251            }
252            if (inflight) {
253                synchronized (preparedTransactions) {
254                    preparedTransactions.remove(txid);
255                }
256            }
257        }
258    
259        public void start() throws Exception {
260        }
261    
262        public void stop() throws Exception {
263        }
264    
265        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
266            // All the in-flight transactions get rolled back..
267            synchronized (inflightTransactions) {
268                inflightTransactions.clear();
269            }
270            this.doingRecover = true;
271            try {
272                Map<TransactionId, Tx> txs = null;
273                synchronized (preparedTransactions) {
274                    txs = new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
275                }
276                for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
277                    Object txid = iter.next();
278                    Tx tx = txs.get(txid);
279                    listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
280                }
281            } finally {
282                this.doingRecover = false;
283            }
284        }
285    
286        /**
287         * @param message
288         * @throws IOException
289         */
290        void addMessage(JournalMessageStore store, Message message, RecordLocation location) throws IOException {
291            Tx tx = getTx(message.getTransactionId(), location);
292            tx.add(store, message);
293        }
294    
295        /**
296         * @param ack
297         * @throws IOException
298         */
299        public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location)
300            throws IOException {
301            Tx tx = getTx(ack.getTransactionId(), location);
302            tx.add(store, ack);
303        }
304    
305        public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
306            Tx tx = getTx(ack.getTransactionId(), location);
307            tx.add(store, ack);
308        }
309    
310        public RecordLocation checkpoint() throws IOException {
311            // Nothing really to checkpoint.. since, we don't
312            // checkpoint tx operations in to long term store until they are
313            // committed.
314            // But we keep track of the first location of an operation
315            // that was associated with an active tx. The journal can not
316            // roll over active tx records.
317            RecordLocation rc = null;
318            synchronized (inflightTransactions) {
319                for (Iterator<Tx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
320                    Tx tx = iter.next();
321                    RecordLocation location = tx.location;
322                    if (rc == null || rc.compareTo(location) < 0) {
323                        rc = location;
324                    }
325                }
326            }
327            synchronized (preparedTransactions) {
328                for (Iterator<Tx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
329                    Tx tx = iter.next();
330                    RecordLocation location = tx.location;
331                    if (rc == null || rc.compareTo(location) < 0) {
332                        rc = location;
333                    }
334                }
335                return rc;
336            }
337        }
338    
339        public boolean isDoingRecover() {
340            return doingRecover;
341        }
342    
343    }