001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.jms.reply;
018    
019    import java.util.concurrent.ScheduledExecutorService;
020    import javax.jms.Destination;
021    import javax.jms.JMSException;
022    import javax.jms.Message;
023    
024    import org.apache.camel.AsyncCallback;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.ExchangeTimedOutException;
027    import org.apache.camel.component.jms.JmsEndpoint;
028    import org.apache.camel.component.jms.JmsMessage;
029    import org.apache.camel.component.jms.JmsMessageHelper;
030    import org.apache.camel.impl.ServiceSupport;
031    import org.apache.camel.util.ObjectHelper;
032    import org.apache.camel.util.ServiceHelper;
033    import org.apache.commons.logging.Log;
034    import org.apache.commons.logging.LogFactory;
035    import org.springframework.jms.listener.AbstractMessageListenerContainer;
036    
037    /**
038     * Base class for {@link ReplyManager} implementations.
039     *
040     * @version $Revision: 20573 $
041     */
042    public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager {
043    
044        protected final Log log = LogFactory.getLog(getClass());
045        protected ScheduledExecutorService executorService;
046        protected JmsEndpoint endpoint;
047        protected Destination replyTo;
048        protected AbstractMessageListenerContainer listenerContainer;
049        protected long replyToResolverTimeout = 5000;
050        protected CorrelationMap correlation;
051    
052        public void setScheduledExecutorService(ScheduledExecutorService executorService) {
053            this.executorService = executorService;
054        }
055    
056        public void setEndpoint(JmsEndpoint endpoint) {
057            this.endpoint = endpoint;
058        }
059    
060        public void setReplyTo(Destination replyTo) {
061            if (log.isTraceEnabled()) {
062                log.trace("ReplyTo destination: " + replyTo);
063            }
064            this.replyTo = replyTo;
065        }
066    
067        public Destination getReplyTo() {
068            synchronized (this) {
069                try {
070                    // wait for the reply to destination to be resolved
071                    if (replyTo == null) {
072                        wait(replyToResolverTimeout);
073                    }
074                } catch (Throwable e) {
075                    // ignore
076                }
077            }
078            return replyTo;
079        }
080    
081        public void onMessage(Message message) {
082            String correlationID = null;
083            try {
084                correlationID = message.getJMSCorrelationID();
085            } catch (JMSException e) {
086                // ignore
087            }
088            if (correlationID == null) {
089                log.warn("Ignoring message with no correlationID: " + message);
090                return;
091            }
092    
093            if (log.isDebugEnabled()) {
094                log.debug("Received reply message with correlationID: " + correlationID + " -> " + message);
095            }
096    
097            // handle the reply message
098            handleReplyMessage(correlationID, message);
099        }
100    
101        public void processReply(ReplyHolder holder) {
102            if (holder != null && isRunAllowed()) {
103                Exchange exchange = holder.getExchange();
104                Message message = holder.getMessage();
105    
106                boolean timeout = holder.isTimeout();
107                if (timeout) {
108                    // no response, so lets set a timed out exception
109                    exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout()));
110                } else {
111                    JmsMessage response = new JmsMessage(message, endpoint.getBinding());
112                    Object body = response.getBody();
113    
114                    if (endpoint.isTransferException() && body instanceof Exception) {
115                        if (log.isDebugEnabled()) {
116                            log.debug("Reply received. Setting reply as an Exception: " + body);
117                        }
118                        // we got an exception back and endpoint was configured to transfer exception
119                        // therefore set response as exception
120                        exchange.setException((Exception) body);
121                    } else {
122                        if (log.isDebugEnabled()) {
123                            log.debug("Reply received. Setting reply as OUT message: " + body);
124                        }
125                        // regular response
126                        exchange.setOut(response);
127                    }
128    
129                    // restore correlation id in case the remote server messed with it
130                    if (holder.getOriginalCorrelationId() != null) {
131                        JmsMessageHelper.setCorrelationId(message, holder.getOriginalCorrelationId());
132                        exchange.getOut().setHeader("JMSCorrelationID", holder.getOriginalCorrelationId());
133                    }
134                }
135    
136                // notify callback
137                AsyncCallback callback = holder.getCallback();
138                callback.done(false);
139            }
140        }
141    
142        protected abstract void handleReplyMessage(String correlationID, Message message);
143    
144        protected abstract AbstractMessageListenerContainer createListenerContainer() throws Exception;
145    
146        /**
147         * <b>IMPORTANT:</b> This logic is only being used due to high performance in-memory only
148         * testing using InOut over JMS. Its unlikely to happen in a real life situation with communication
149         * to a remote broker, which always will be slower to send back reply, before Camel had a chance
150         * to update it's internal correlation map.
151         */
152        protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, Message message) {
153            // race condition, when using messageID as correlationID then we store a provisional correlation id
154            // at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely
155            // event that the reply comes back really really fast, and the correlation map hasn't yet been updated
156            // from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again.
157            if (log.isWarnEnabled()) {
158                log.warn("Early reply received with correlationID [" + correlationID + "] -> " + message);
159            }
160    
161            ReplyHandler answer = null;
162    
163            // wait up till 5 seconds
164            boolean done = false;
165            int counter = 0;
166            while (!done && counter++ < 50) {
167                if (log.isTraceEnabled()) {
168                    log.trace("Early reply not found handler at attempt " + counter + ". Waiting a bit longer.");
169                }
170                try {
171                    Thread.sleep(100);
172                } catch (InterruptedException e) {
173                    // ignore
174                }
175    
176                // try again
177                answer = correlation.get(correlationID);
178                done = answer != null;
179    
180                if (answer != null) {
181                    if (log.isTraceEnabled()) {
182                        log.trace("Early reply with correlationID [" + correlationID + "] has been matched after "
183                                + counter + " attempts and can be processed using handler: " + answer);
184                    }
185                }
186            }
187    
188            return answer;
189        }
190    
191        @Override
192        protected void doStart() throws Exception {
193            ObjectHelper.notNull(executorService, "executorService", this);
194            ObjectHelper.notNull(endpoint, "endpoint", this);
195    
196            // purge for timeout every second
197            correlation = new CorrelationMap(executorService, 1000);
198            ServiceHelper.startService(correlation);
199    
200            // create JMS listener and start it
201            listenerContainer = createListenerContainer();
202            listenerContainer.afterPropertiesSet();
203            listenerContainer.start();
204        }
205    
206        @Override
207        protected void doStop() throws Exception {
208            ServiceHelper.stopService(correlation);
209    
210            if (listenerContainer != null) {
211                listenerContainer.stop();
212                listenerContainer.destroy();
213                listenerContainer = null;
214            }
215        }
216    
217    }