001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.cxf;
018
019 import java.lang.reflect.Method;
020 import java.util.HashMap;
021 import java.util.Map;
022 import javax.xml.ws.WebFault;
023
024 import org.w3c.dom.Element;
025
026 import org.apache.camel.AsyncCallback;
027 import org.apache.camel.Processor;
028 import org.apache.camel.impl.DefaultConsumer;
029 import org.apache.camel.util.ObjectHelper;
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032 import org.apache.cxf.continuations.Continuation;
033 import org.apache.cxf.continuations.ContinuationProvider;
034 import org.apache.cxf.endpoint.Server;
035 import org.apache.cxf.frontend.ServerFactoryBean;
036 import org.apache.cxf.interceptor.Fault;
037 import org.apache.cxf.message.Exchange;
038 import org.apache.cxf.message.Message;
039 import org.apache.cxf.service.invoker.Invoker;
040 import org.apache.cxf.service.model.BindingOperationInfo;
041 import org.apache.cxf.version.Version;
042
043 /**
044 * A Consumer of exchanges for a service in CXF. CxfConsumer acts a CXF
045 * service to receive requests, convert them, and forward them to Camel
046 * route for processing. It is also responsible for converting and sending
047 * back responses to CXF client.
048 *
049 * @version $Revision: 22113 $
050 */
051 public class CxfConsumer extends DefaultConsumer {
052 private static final Log LOG = LogFactory.getLog(CxfConsumer.class);
053 private Server server;
054
055 public CxfConsumer(final CxfEndpoint endpoint, Processor processor) throws Exception {
056 super(endpoint, processor);
057
058 // create server
059 ServerFactoryBean svrBean = endpoint.createServerFactoryBean();
060 svrBean.setInvoker(new Invoker() {
061
062 // we receive a CXF request when this method is called
063 public Object invoke(Exchange cxfExchange, Object o) {
064 if (LOG.isTraceEnabled()) {
065 LOG.trace("Received CXF Request: " + cxfExchange);
066 }
067 Continuation continuation = getContinuation(cxfExchange);
068 // Only calling the continuation API for CXF 2.3.x
069 if (continuation != null && !endpoint.isSynchronous() && Version.getCurrentVersion().startsWith("2.3")) {
070 if (LOG.isTraceEnabled()) {
071 LOG.trace("Calling the Camel async processors.");
072 }
073 return asyncInvoke(cxfExchange, continuation);
074 } else {
075 if (LOG.isTraceEnabled()) {
076 LOG.trace("Calling the Camel sync processors.");
077 }
078 return syncInvoke(cxfExchange);
079 }
080 }
081
082 // NOTE this code cannot work with CXF 2.2.x
083 private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
084 synchronized (continuation) {
085 if (continuation.isNew()) {
086 final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
087
088 // Now we don't set up the timeout value
089 if (LOG.isTraceEnabled()) {
090 LOG.trace("Suspending continuation of exchangeId: " + camelExchange.getExchangeId());
091 }
092 // TODO Support to set the timeout in case the Camel can't send the response back on time.
093 // The continuation could be called before the suspend is called
094 continuation.suspend(0);
095
096 // use the asynchronous API to process the exchange
097 getAsyncProcessor().process(camelExchange, new AsyncCallback() {
098 public void done(boolean doneSync) {
099 // make sure the continuation resume will not be called before the suspend method in other thread
100 synchronized (continuation) {
101 if (LOG.isTraceEnabled()) {
102 LOG.trace("Resuming continuation of exchangeId: "
103 + camelExchange.getExchangeId());
104 }
105 // resume processing after both, sync and async callbacks
106 continuation.setObject(camelExchange);
107 continuation.resume();
108 }
109 }
110 });
111
112 }
113 if (continuation.isResumed()) {
114 org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
115 .getObject();
116 setResponseBack(cxfExchange, camelExchange);
117
118 }
119 }
120 return null;
121 }
122
123 private Continuation getContinuation(Exchange cxfExchange) {
124 ContinuationProvider provider =
125 (ContinuationProvider)cxfExchange.getInMessage().get(ContinuationProvider.class.getName());
126 return provider == null ? null : provider.getContinuation();
127 }
128
129 private Object syncInvoke(Exchange cxfExchange) {
130 org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
131 // send Camel exchange to the target processor
132 if (LOG.isTraceEnabled()) {
133 LOG.trace("Processing +++ START +++");
134 }
135 try {
136 getProcessor().process(camelExchange);
137 } catch (Exception e) {
138 throw new Fault(e);
139 }
140 if (LOG.isTraceEnabled()) {
141 LOG.trace("Processing +++ END +++");
142 }
143 setResponseBack(cxfExchange, camelExchange);
144 // response should have been set in outMessage's content
145 return null;
146 }
147
148 private org.apache.camel.Exchange perpareCamelExchange(Exchange cxfExchange) {
149 // get CXF binding
150 CxfEndpoint endpoint = (CxfEndpoint)getEndpoint();
151 CxfBinding binding = endpoint.getCxfBinding();
152
153 // create a Camel exchange
154 org.apache.camel.Exchange camelExchange = endpoint.createExchange();
155 DataFormat dataFormat = endpoint.getDataFormat();
156
157 BindingOperationInfo boi = cxfExchange.getBindingOperationInfo();
158 // make sure the "boi" is remained as wrapped in PAYLOAD mode
159 if (dataFormat == DataFormat.PAYLOAD && boi.isUnwrapped()) {
160 boi = boi.getWrappedOperation();
161 cxfExchange.put(BindingOperationInfo.class, boi);
162 }
163
164 if (boi != null) {
165 camelExchange.setProperty(BindingOperationInfo.class.getName(), boi);
166 if (LOG.isTraceEnabled()) {
167 LOG.trace("Set exchange property: BindingOperationInfo: " + boi);
168 }
169 }
170
171 // set data format mode in Camel exchange
172 camelExchange.setProperty(CxfConstants.DATA_FORMAT_PROPERTY, dataFormat);
173 if (LOG.isTraceEnabled()) {
174 LOG.trace("Set Exchange property: " + DataFormat.class.getName()
175 + "=" + dataFormat);
176 }
177
178 camelExchange.setProperty(Message.MTOM_ENABLED, String.valueOf(endpoint.isMtomEnabled()));
179
180 // bind the CXF request into a Camel exchange
181 binding.populateExchangeFromCxfRequest(cxfExchange, camelExchange);
182 // extract the javax.xml.ws header
183 Map<String, Object> context = new HashMap<String, Object>();
184 binding.extractJaxWsContext(cxfExchange, context);
185 // put the context into camelExchange
186 camelExchange.setProperty(CxfConstants.JAXWS_CONTEXT, context);
187 return camelExchange;
188
189 }
190
191 @SuppressWarnings("unchecked")
192 private void setResponseBack(Exchange cxfExchange, org.apache.camel.Exchange camelExchange) {
193 CxfEndpoint endpoint = (CxfEndpoint)getEndpoint();
194 CxfBinding binding = endpoint.getCxfBinding();
195
196 checkFailure(camelExchange);
197
198 // bind the Camel response into a CXF response
199 if (camelExchange.getPattern().isOutCapable()) {
200 binding.populateCxfResponseFromExchange(camelExchange, cxfExchange);
201 }
202
203 // check failure again as fault could be discovered by converter
204 checkFailure(camelExchange);
205
206 // copy the headers javax.xml.ws header back
207 binding.copyJaxWsContext(cxfExchange, (Map<String, Object>)camelExchange.getProperty(CxfConstants.JAXWS_CONTEXT));
208 }
209
210 private void checkFailure(org.apache.camel.Exchange camelExchange) throws Fault {
211 final Throwable t;
212 if (camelExchange.isFailed()) {
213 t = (camelExchange.hasOut() && camelExchange.getOut().isFault()) ? camelExchange.getOut()
214 .getBody(Throwable.class) : camelExchange.getException();
215 if (t instanceof Fault) {
216 throw (Fault)t;
217 } else if (t != null) {
218 // This is not a CXF Fault. Build the CXF Fault manuallly.
219 Fault fault = new Fault(t);
220 if (fault.getMessage() == null) {
221 // The Fault has no Message. This is the case if t had
222 // no message, for
223 // example was a NullPointerException.
224 fault.setMessage(t.getClass().getSimpleName());
225 }
226 WebFault faultAnnotation = t.getClass().getAnnotation(WebFault.class);
227 Object faultInfo = null;
228 try {
229 Method method = t.getClass().getMethod("getFaultInfo", new Class[0]);
230 faultInfo = method.invoke(t, new Object[0]);
231 } catch (Exception e) {
232 // do nothing here
233 }
234 if (faultAnnotation != null && faultInfo == null) {
235 // t has a JAX-WS WebFault annotation, which describes
236 // in detail the Web Service Fault that should be thrown. Add the
237 // detail.
238 Element detail = fault.getOrCreateDetail();
239 Element faultDetails = detail.getOwnerDocument()
240 .createElementNS(faultAnnotation.targetNamespace(), faultAnnotation.name());
241 detail.appendChild(faultDetails);
242 }
243
244 throw fault;
245 }
246
247 }
248 }
249
250 });
251 server = svrBean.create();
252 if (ObjectHelper.isNotEmpty(endpoint.getPublishedEndpointUrl())) {
253 server.getEndpoint().getEndpointInfo().setProperty("publishedEndpointUrl", endpoint.getPublishedEndpointUrl());
254 }
255 }
256
257 @Override
258 protected void doStart() throws Exception {
259 super.doStart();
260 server.start();
261 }
262
263 @Override
264 protected void doStop() throws Exception {
265 server.stop();
266 super.doStop();
267 }
268
269 public Server getServer() {
270 return server;
271 }
272
273 }