001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.List;
024    
025    import org.apache.activemq.command.BaseCommand;
026    import org.apache.activemq.kaha.Marshaller;
027    import org.apache.activemq.util.ByteSequence;
028    import org.apache.activemq.wireformat.WireFormat;
029    
030    /**
031     * Marshall a Transaction
032     * 
033     * @version $Revision: 1.10 $
034     */
035    public class TransactionMarshaller implements Marshaller {
036    
037        private WireFormat wireFormat;
038    
039        public TransactionMarshaller(WireFormat wireFormat) {
040            this.wireFormat = wireFormat;
041    
042        }
043    
044        public void writePayload(Object object, DataOutput dataOut) throws IOException {
045            KahaTransaction kt = (KahaTransaction)object;
046            List list = kt.getList();
047            dataOut.writeInt(list.size());
048            for (int i = 0; i < list.size(); i++) {
049                TxCommand tx = (TxCommand)list.get(i);
050                Object key = tx.getMessageStoreKey();
051                ByteSequence packet = wireFormat.marshal(key);
052                dataOut.writeInt(packet.length);
053                dataOut.write(packet.data, packet.offset, packet.length);
054                Object command = tx.getCommand();
055                packet = wireFormat.marshal(command);
056                dataOut.writeInt(packet.length);
057                dataOut.write(packet.data, packet.offset, packet.length);
058    
059            }
060        }
061    
062        public Object readPayload(DataInput dataIn) throws IOException {
063            KahaTransaction result = new KahaTransaction();
064            List list = new ArrayList();
065            result.setList(list);
066            int number = dataIn.readInt();
067            for (int i = 0; i < number; i++) {
068                TxCommand command = new TxCommand();
069                int size = dataIn.readInt();
070                byte[] data = new byte[size];
071                dataIn.readFully(data);
072                Object key = wireFormat.unmarshal(new ByteSequence(data));
073                command.setMessageStoreKey(key);
074                size = dataIn.readInt();
075                data = new byte[size];
076                dataIn.readFully(data);
077                BaseCommand bc = (BaseCommand)wireFormat.unmarshal(new ByteSequence(data));
078                command.setCommand(bc);
079                list.add(command);
080            }
081            return result;
082    
083        }
084    }