001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.jms.reply;
018
019 import java.util.concurrent.ScheduledExecutorService;
020 import javax.jms.Destination;
021 import javax.jms.JMSException;
022 import javax.jms.Message;
023
024 import org.apache.camel.AsyncCallback;
025 import org.apache.camel.Exchange;
026 import org.apache.camel.ExchangeTimedOutException;
027 import org.apache.camel.component.jms.JmsEndpoint;
028 import org.apache.camel.component.jms.JmsMessage;
029 import org.apache.camel.component.jms.JmsMessageHelper;
030 import org.apache.camel.impl.ServiceSupport;
031 import org.apache.camel.util.ObjectHelper;
032 import org.apache.camel.util.ServiceHelper;
033 import org.apache.commons.logging.Log;
034 import org.apache.commons.logging.LogFactory;
035 import org.springframework.jms.listener.AbstractMessageListenerContainer;
036
037 /**
038 * Base class for {@link ReplyManager} implementations.
039 *
040 * @version $Revision: 20573 $
041 */
042 public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager {
043
044 protected final Log log = LogFactory.getLog(getClass());
045 protected ScheduledExecutorService executorService;
046 protected JmsEndpoint endpoint;
047 protected Destination replyTo;
048 protected AbstractMessageListenerContainer listenerContainer;
049 protected long replyToResolverTimeout = 5000;
050 protected CorrelationMap correlation;
051
052 public void setScheduledExecutorService(ScheduledExecutorService executorService) {
053 this.executorService = executorService;
054 }
055
056 public void setEndpoint(JmsEndpoint endpoint) {
057 this.endpoint = endpoint;
058 }
059
060 public void setReplyTo(Destination replyTo) {
061 if (log.isTraceEnabled()) {
062 log.trace("ReplyTo destination: " + replyTo);
063 }
064 this.replyTo = replyTo;
065 }
066
067 public Destination getReplyTo() {
068 synchronized (this) {
069 try {
070 // wait for the reply to destination to be resolved
071 if (replyTo == null) {
072 wait(replyToResolverTimeout);
073 }
074 } catch (Throwable e) {
075 // ignore
076 }
077 }
078 return replyTo;
079 }
080
081 public void onMessage(Message message) {
082 String correlationID = null;
083 try {
084 correlationID = message.getJMSCorrelationID();
085 } catch (JMSException e) {
086 // ignore
087 }
088 if (correlationID == null) {
089 log.warn("Ignoring message with no correlationID: " + message);
090 return;
091 }
092
093 if (log.isDebugEnabled()) {
094 log.debug("Received reply message with correlationID: " + correlationID + " -> " + message);
095 }
096
097 // handle the reply message
098 handleReplyMessage(correlationID, message);
099 }
100
101 public void processReply(ReplyHolder holder) {
102 if (holder != null && isRunAllowed()) {
103 Exchange exchange = holder.getExchange();
104 Message message = holder.getMessage();
105
106 boolean timeout = holder.isTimeout();
107 if (timeout) {
108 // no response, so lets set a timed out exception
109 exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout()));
110 } else {
111 JmsMessage response = new JmsMessage(message, endpoint.getBinding());
112 Object body = response.getBody();
113
114 if (endpoint.isTransferException() && body instanceof Exception) {
115 if (log.isDebugEnabled()) {
116 log.debug("Reply received. Setting reply as an Exception: " + body);
117 }
118 // we got an exception back and endpoint was configured to transfer exception
119 // therefore set response as exception
120 exchange.setException((Exception) body);
121 } else {
122 if (log.isDebugEnabled()) {
123 log.debug("Reply received. Setting reply as OUT message: " + body);
124 }
125 // regular response
126 exchange.setOut(response);
127 }
128
129 // restore correlation id in case the remote server messed with it
130 if (holder.getOriginalCorrelationId() != null) {
131 JmsMessageHelper.setCorrelationId(message, holder.getOriginalCorrelationId());
132 exchange.getOut().setHeader("JMSCorrelationID", holder.getOriginalCorrelationId());
133 }
134 }
135
136 // notify callback
137 AsyncCallback callback = holder.getCallback();
138 callback.done(false);
139 }
140 }
141
142 protected abstract void handleReplyMessage(String correlationID, Message message);
143
144 protected abstract AbstractMessageListenerContainer createListenerContainer() throws Exception;
145
146 /**
147 * <b>IMPORTANT:</b> This logic is only being used due to high performance in-memory only
148 * testing using InOut over JMS. Its unlikely to happen in a real life situation with communication
149 * to a remote broker, which always will be slower to send back reply, before Camel had a chance
150 * to update it's internal correlation map.
151 */
152 protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, Message message) {
153 // race condition, when using messageID as correlationID then we store a provisional correlation id
154 // at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely
155 // event that the reply comes back really really fast, and the correlation map hasn't yet been updated
156 // from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again.
157 if (log.isWarnEnabled()) {
158 log.warn("Early reply received with correlationID [" + correlationID + "] -> " + message);
159 }
160
161 ReplyHandler answer = null;
162
163 // wait up till 5 seconds
164 boolean done = false;
165 int counter = 0;
166 while (!done && counter++ < 50) {
167 if (log.isTraceEnabled()) {
168 log.trace("Early reply not found handler at attempt " + counter + ". Waiting a bit longer.");
169 }
170 try {
171 Thread.sleep(100);
172 } catch (InterruptedException e) {
173 // ignore
174 }
175
176 // try again
177 answer = correlation.get(correlationID);
178 done = answer != null;
179
180 if (answer != null) {
181 if (log.isTraceEnabled()) {
182 log.trace("Early reply with correlationID [" + correlationID + "] has been matched after "
183 + counter + " attempts and can be processed using handler: " + answer);
184 }
185 }
186 }
187
188 return answer;
189 }
190
191 @Override
192 protected void doStart() throws Exception {
193 ObjectHelper.notNull(executorService, "executorService", this);
194 ObjectHelper.notNull(endpoint, "endpoint", this);
195
196 // purge for timeout every second
197 correlation = new CorrelationMap(executorService, 1000);
198 ServiceHelper.startService(correlation);
199
200 // create JMS listener and start it
201 listenerContainer = createListenerContainer();
202 listenerContainer.afterPropertiesSet();
203 listenerContainer.start();
204 }
205
206 @Override
207 protected void doStop() throws Exception {
208 ServiceHelper.stopService(correlation);
209
210 if (listenerContainer != null) {
211 listenerContainer.stop();
212 listenerContainer.destroy();
213 listenerContainer = null;
214 }
215 }
216
217 }