001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.jms.reply;
018
019 import javax.jms.Destination;
020 import javax.jms.ExceptionListener;
021 import javax.jms.JMSException;
022 import javax.jms.Message;
023 import javax.jms.Session;
024 import javax.jms.TemporaryQueue;
025
026 import org.apache.camel.AsyncCallback;
027 import org.apache.camel.Exchange;
028 import org.apache.camel.util.IntrospectionSupport;
029 import org.springframework.core.task.TaskExecutor;
030 import org.springframework.jms.listener.AbstractMessageListenerContainer;
031 import org.springframework.jms.listener.DefaultMessageListenerContainer;
032 import org.springframework.jms.support.destination.DestinationResolver;
033
034 /**
035 * A {@link ReplyManager} when using temporary queues.
036 *
037 * @version $Revision: 21895 $
038 */
039 public class TemporaryQueueReplyManager extends ReplyManagerSupport {
040
041 public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
042 String originalCorrelationId, String correlationId, long requestTimeout) {
043 // add to correlation map
044 TemporaryQueueReplyHandler handler = new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, requestTimeout);
045 correlation.put(correlationId, handler, requestTimeout);
046 return correlationId;
047 }
048
049 public void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout) {
050 if (log.isTraceEnabled()) {
051 log.trace("Updated provisional correlationId [" + correlationId + "] to expected correlationId [" + newCorrelationId + "]");
052 }
053
054 ReplyHandler handler = correlation.remove(correlationId);
055 correlation.put(newCorrelationId, handler, requestTimeout);
056 }
057
058 @Override
059 protected void handleReplyMessage(String correlationID, Message message) {
060 ReplyHandler handler = correlation.get(correlationID);
061 if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
062 handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
063 }
064
065 if (handler != null) {
066 try {
067 handler.onReply(correlationID, message);
068 } finally {
069 correlation.remove(correlationID);
070 }
071 } else {
072 // we could not correlate the received reply message to a matching request and therefore
073 // we cannot continue routing the unknown message
074 String text = "Reply received for unknown correlationID [" + correlationID + "] -> " + message;
075 log.warn(text);
076 throw new UnknownReplyMessageException(text, message, correlationID);
077 }
078 }
079
080 public void setReplyToSelectorHeader(org.apache.camel.Message camelMessage, Message jmsMessage) throws JMSException {
081 // noop
082 }
083
084 @Override
085 protected AbstractMessageListenerContainer createListenerContainer() throws Exception {
086 // Use DefaultMessageListenerContainer as it supports reconnects (see CAMEL-3193)
087 DefaultMessageListenerContainer answer = new DefaultMessageListenerContainer();
088
089 answer.setDestinationName("temporary");
090 answer.setDestinationResolver(new DestinationResolver() {
091 public Destination resolveDestinationName(Session session, String destinationName,
092 boolean pubSubDomain) throws JMSException {
093 // use a temporary queue to gather the reply message
094 TemporaryQueue queue = null;
095 synchronized (TemporaryQueueReplyManager.this) {
096 try {
097 queue = session.createTemporaryQueue();
098 setReplyTo(queue);
099 } finally {
100 TemporaryQueueReplyManager.this.notifyAll();
101 }
102 }
103 return queue;
104 }
105 });
106 answer.setAutoStartup(true);
107 answer.setMessageListener(this);
108 answer.setPubSubDomain(false);
109 answer.setSubscriptionDurable(false);
110 answer.setConcurrentConsumers(1);
111 answer.setConnectionFactory(endpoint.getConnectionFactory());
112 answer.setSessionTransacted(false);
113 String clientId = endpoint.getClientId();
114 if (clientId != null) {
115 clientId += ".CamelReplyManager";
116 answer.setClientId(clientId);
117 }
118 TaskExecutor taskExecutor = endpoint.getTaskExecutor();
119 if (taskExecutor != null) {
120 answer.setTaskExecutor(taskExecutor);
121 }
122 if (endpoint.getTaskExecutorSpring2() != null) {
123 // use reflection to invoke to support spring 2 when JAR is compiled with Spring 3.0
124 IntrospectionSupport.setProperty(answer, "taskExecutor", endpoint.getTaskExecutorSpring2());
125 }
126 ExceptionListener exceptionListener = endpoint.getExceptionListener();
127 if (exceptionListener != null) {
128 answer.setExceptionListener(exceptionListener);
129 }
130 return answer;
131 }
132
133 }