001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.jms.reply;
018    
019    import javax.jms.Destination;
020    import javax.jms.ExceptionListener;
021    import javax.jms.JMSException;
022    import javax.jms.Message;
023    import javax.jms.Session;
024    import javax.jms.TemporaryQueue;
025    
026    import org.apache.camel.AsyncCallback;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.util.IntrospectionSupport;
029    import org.springframework.core.task.TaskExecutor;
030    import org.springframework.jms.listener.AbstractMessageListenerContainer;
031    import org.springframework.jms.listener.DefaultMessageListenerContainer;
032    import org.springframework.jms.support.destination.DestinationResolver;
033    
034    /**
035     * A {@link ReplyManager} when using temporary queues.
036     *
037     * @version $Revision: 21895 $
038     */
039    public class TemporaryQueueReplyManager extends ReplyManagerSupport {
040    
041        public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
042                                    String originalCorrelationId, String correlationId, long requestTimeout) {
043            // add to correlation map
044            TemporaryQueueReplyHandler handler = new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, requestTimeout);
045            correlation.put(correlationId, handler, requestTimeout);
046            return correlationId;
047        }
048    
049        public void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout) {
050            if (log.isTraceEnabled()) {
051                log.trace("Updated provisional correlationId [" + correlationId + "] to expected correlationId [" + newCorrelationId + "]");
052            }
053    
054            ReplyHandler handler = correlation.remove(correlationId);
055            correlation.put(newCorrelationId, handler, requestTimeout);
056        }
057    
058        @Override
059        protected void handleReplyMessage(String correlationID, Message message) {
060            ReplyHandler handler = correlation.get(correlationID);
061            if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
062                handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
063            }
064    
065            if (handler != null) {
066                try {
067                    handler.onReply(correlationID, message);
068                } finally {
069                    correlation.remove(correlationID);
070                }
071            } else {
072                // we could not correlate the received reply message to a matching request and therefore
073                // we cannot continue routing the unknown message
074                String text = "Reply received for unknown correlationID [" + correlationID + "] -> " + message;
075                log.warn(text);
076                throw new UnknownReplyMessageException(text, message, correlationID);
077            }
078        }
079    
080        public void setReplyToSelectorHeader(org.apache.camel.Message camelMessage, Message jmsMessage) throws JMSException {
081            // noop
082        }
083    
084        @Override
085        protected AbstractMessageListenerContainer createListenerContainer() throws Exception {
086            // Use DefaultMessageListenerContainer as it supports reconnects (see CAMEL-3193)
087            DefaultMessageListenerContainer answer = new DefaultMessageListenerContainer();
088    
089            answer.setDestinationName("temporary");
090            answer.setDestinationResolver(new DestinationResolver() {
091                public Destination resolveDestinationName(Session session, String destinationName,
092                                                          boolean pubSubDomain) throws JMSException {
093                    // use a temporary queue to gather the reply message
094                    TemporaryQueue queue = null;
095                    synchronized (TemporaryQueueReplyManager.this) {
096                        try {
097                            queue = session.createTemporaryQueue();
098                            setReplyTo(queue);
099                        } finally {
100                            TemporaryQueueReplyManager.this.notifyAll();
101                        }
102                    }
103                    return queue;
104                }
105            });
106            answer.setAutoStartup(true);
107            answer.setMessageListener(this);
108            answer.setPubSubDomain(false);
109            answer.setSubscriptionDurable(false);
110            answer.setConcurrentConsumers(1);
111            answer.setConnectionFactory(endpoint.getConnectionFactory());
112            answer.setSessionTransacted(false);
113            String clientId = endpoint.getClientId();
114            if (clientId != null) {
115                clientId += ".CamelReplyManager";
116                answer.setClientId(clientId);
117            }
118            TaskExecutor taskExecutor = endpoint.getTaskExecutor();
119            if (taskExecutor != null) {
120                answer.setTaskExecutor(taskExecutor);
121            }
122            if (endpoint.getTaskExecutorSpring2() != null) {
123                // use reflection to invoke to support spring 2 when JAR is compiled with Spring 3.0
124                IntrospectionSupport.setProperty(answer, "taskExecutor", endpoint.getTaskExecutorSpring2());
125            }
126            ExceptionListener exceptionListener = endpoint.getExceptionListener();
127            if (exceptionListener != null) {
128                answer.setExceptionListener(exceptionListener);
129            }
130            return answer;
131        }
132    
133    }