001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.cxf;
018    
019    import java.lang.reflect.Method;
020    import java.util.HashMap;
021    import java.util.Map;
022    import javax.xml.ws.WebFault;
023    
024    import org.w3c.dom.Element;
025    
026    import org.apache.camel.AsyncCallback;
027    import org.apache.camel.Processor;
028    import org.apache.camel.impl.DefaultConsumer;
029    import org.apache.camel.util.ObjectHelper;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    import org.apache.cxf.continuations.Continuation;
033    import org.apache.cxf.continuations.ContinuationProvider;
034    import org.apache.cxf.endpoint.Server;
035    import org.apache.cxf.frontend.ServerFactoryBean;
036    import org.apache.cxf.interceptor.Fault;
037    import org.apache.cxf.message.Exchange;
038    import org.apache.cxf.message.Message;
039    import org.apache.cxf.service.invoker.Invoker;
040    import org.apache.cxf.service.model.BindingOperationInfo;
041    import org.apache.cxf.version.Version;
042    
043    /**
044     * A Consumer of exchanges for a service in CXF.  CxfConsumer acts a CXF
045     * service to receive requests, convert them, and forward them to Camel 
046     * route for processing. It is also responsible for converting and sending
047     * back responses to CXF client.
048     *
049     * @version $Revision: 22113 $
050     */
051    public class CxfConsumer extends DefaultConsumer {
052        private static final Log LOG = LogFactory.getLog(CxfConsumer.class);
053        private Server server;
054    
055        public CxfConsumer(final CxfEndpoint endpoint, Processor processor) throws Exception {
056            super(endpoint, processor);
057            
058            // create server
059            ServerFactoryBean svrBean = endpoint.createServerFactoryBean();
060            svrBean.setInvoker(new Invoker() {
061    
062                // we receive a CXF request when this method is called
063                public Object invoke(Exchange cxfExchange, Object o) {
064                    if (LOG.isTraceEnabled()) {
065                        LOG.trace("Received CXF Request: " + cxfExchange);
066                    }                
067                    Continuation continuation = getContinuation(cxfExchange);
068                    // Only calling the continuation API for CXF 2.3.x 
069                    if (continuation != null && !endpoint.isSynchronous() && Version.getCurrentVersion().startsWith("2.3")) {
070                        if (LOG.isTraceEnabled()) {
071                            LOG.trace("Calling the Camel async processors.");
072                        }
073                        return asyncInvoke(cxfExchange, continuation);
074                    } else {
075                        if (LOG.isTraceEnabled()) {
076                            LOG.trace("Calling the Camel sync processors.");
077                        }
078                        return syncInvoke(cxfExchange);
079                    }
080                }            
081                
082                // NOTE this code cannot work with CXF 2.2.x
083                private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
084                    synchronized (continuation) {
085                        if (continuation.isNew()) {
086                            final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
087                            
088                            // Now we don't set up the timeout value
089                            if (LOG.isTraceEnabled()) {
090                                LOG.trace("Suspending continuation of exchangeId: " + camelExchange.getExchangeId());
091                            }
092                            // TODO Support to set the timeout in case the Camel can't send the response back on time.
093                            // The continuation could be called before the suspend is called
094                            continuation.suspend(0);
095    
096                            // use the asynchronous API to process the exchange
097                            getAsyncProcessor().process(camelExchange, new AsyncCallback() {
098                                public void done(boolean doneSync) {
099                                    // make sure the continuation resume will not be called before the suspend method in other thread
100                                    synchronized (continuation) {
101                                        if (LOG.isTraceEnabled()) {
102                                            LOG.trace("Resuming continuation of exchangeId: "
103                                                      + camelExchange.getExchangeId());
104                                        }
105                                        // resume processing after both, sync and async callbacks
106                                        continuation.setObject(camelExchange);
107                                        continuation.resume();
108                                    }
109                                }
110                            });
111                            
112                        }
113                        if (continuation.isResumed()) {
114                            org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
115                                .getObject();
116                            setResponseBack(cxfExchange, camelExchange);
117    
118                        }
119                    }
120                    return null;
121                }
122    
123                private Continuation getContinuation(Exchange cxfExchange) {
124                    ContinuationProvider provider = 
125                        (ContinuationProvider)cxfExchange.getInMessage().get(ContinuationProvider.class.getName());
126                    return provider == null ? null : provider.getContinuation();
127                }
128                
129                private Object syncInvoke(Exchange cxfExchange) {
130                    org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);               
131                    // send Camel exchange to the target processor
132                    if (LOG.isTraceEnabled()) {
133                        LOG.trace("Processing +++ START +++");
134                    }
135                    try {
136                        getProcessor().process(camelExchange);
137                    } catch (Exception e) {
138                        throw new Fault(e);
139                    }
140                    if (LOG.isTraceEnabled()) {
141                        LOG.trace("Processing +++ END +++");
142                    }
143                    setResponseBack(cxfExchange, camelExchange);
144                    // response should have been set in outMessage's content
145                    return null;
146                }
147                
148                private org.apache.camel.Exchange perpareCamelExchange(Exchange cxfExchange) {
149                    // get CXF binding
150                    CxfEndpoint endpoint = (CxfEndpoint)getEndpoint();
151                    CxfBinding binding = endpoint.getCxfBinding();
152    
153                    // create a Camel exchange
154                    org.apache.camel.Exchange camelExchange = endpoint.createExchange();
155                    DataFormat dataFormat = endpoint.getDataFormat();
156    
157                    BindingOperationInfo boi = cxfExchange.getBindingOperationInfo();
158                    // make sure the "boi" is remained as wrapped in PAYLOAD mode
159                    if (dataFormat == DataFormat.PAYLOAD && boi.isUnwrapped()) {
160                        boi = boi.getWrappedOperation();
161                        cxfExchange.put(BindingOperationInfo.class, boi);
162                    }
163                    
164                    if (boi != null) {
165                        camelExchange.setProperty(BindingOperationInfo.class.getName(), boi);
166                        if (LOG.isTraceEnabled()) {
167                            LOG.trace("Set exchange property: BindingOperationInfo: " + boi);
168                        }
169                    }
170                    
171                    // set data format mode in Camel exchange
172                    camelExchange.setProperty(CxfConstants.DATA_FORMAT_PROPERTY, dataFormat);   
173                    if (LOG.isTraceEnabled()) {
174                        LOG.trace("Set Exchange property: " + DataFormat.class.getName() 
175                                + "=" + dataFormat);
176                    }
177                    
178                    camelExchange.setProperty(Message.MTOM_ENABLED, String.valueOf(endpoint.isMtomEnabled()));
179                    
180                    // bind the CXF request into a Camel exchange
181                    binding.populateExchangeFromCxfRequest(cxfExchange, camelExchange);
182                    // extract the javax.xml.ws header
183                    Map<String, Object> context = new HashMap<String, Object>();
184                    binding.extractJaxWsContext(cxfExchange, context);
185                    // put the context into camelExchange
186                    camelExchange.setProperty(CxfConstants.JAXWS_CONTEXT, context);
187                    return camelExchange;
188                    
189                }
190                
191                @SuppressWarnings("unchecked")
192                private void setResponseBack(Exchange cxfExchange, org.apache.camel.Exchange camelExchange) {
193                    CxfEndpoint endpoint = (CxfEndpoint)getEndpoint();
194                    CxfBinding binding = endpoint.getCxfBinding();                
195                    
196                    checkFailure(camelExchange);
197                    
198                    // bind the Camel response into a CXF response
199                    if (camelExchange.getPattern().isOutCapable()) {
200                        binding.populateCxfResponseFromExchange(camelExchange, cxfExchange);
201                    }
202                    
203                    // check failure again as fault could be discovered by converter
204                    checkFailure(camelExchange);
205    
206                    // copy the headers javax.xml.ws header back
207                    binding.copyJaxWsContext(cxfExchange, (Map<String, Object>)camelExchange.getProperty(CxfConstants.JAXWS_CONTEXT));
208                }
209    
210                private void checkFailure(org.apache.camel.Exchange camelExchange) throws Fault {
211                    final Throwable t;
212                    if (camelExchange.isFailed()) {
213                        t = (camelExchange.hasOut() && camelExchange.getOut().isFault()) ? camelExchange.getOut()
214                            .getBody(Throwable.class) : camelExchange.getException();
215                        if (t instanceof Fault) {
216                            throw (Fault)t;
217                        } else if (t != null) {                        
218                            // This is not a CXF Fault. Build the CXF Fault manuallly.
219                            Fault fault = new Fault(t);
220                            if (fault.getMessage() == null) {
221                                // The Fault has no Message. This is the case if t had
222                                // no message, for
223                                // example was a NullPointerException.
224                                fault.setMessage(t.getClass().getSimpleName());
225                            }
226                            WebFault faultAnnotation = t.getClass().getAnnotation(WebFault.class);
227                            Object faultInfo = null;
228                            try {
229                                Method method = t.getClass().getMethod("getFaultInfo", new Class[0]);
230                                faultInfo = method.invoke(t, new Object[0]);
231                            } catch (Exception e) {
232                                // do nothing here                            
233                            }
234                            if (faultAnnotation != null && faultInfo == null) {
235                                // t has a JAX-WS WebFault annotation, which describes
236                                // in detail the Web Service Fault that should be thrown. Add the
237                                // detail.
238                                Element detail = fault.getOrCreateDetail();
239                                Element faultDetails = detail.getOwnerDocument()
240                                    .createElementNS(faultAnnotation.targetNamespace(), faultAnnotation.name());
241                                detail.appendChild(faultDetails);
242                            }
243    
244                            throw fault;
245                        }
246    
247                    }
248                }
249    
250            });
251            server = svrBean.create();
252            if (ObjectHelper.isNotEmpty(endpoint.getPublishedEndpointUrl())) {
253                server.getEndpoint().getEndpointInfo().setProperty("publishedEndpointUrl", endpoint.getPublishedEndpointUrl());
254            }
255        }
256        
257        @Override
258        protected void doStart() throws Exception {
259            super.doStart();
260            server.start();
261        }
262    
263        @Override
264        protected void doStop() throws Exception {
265            server.stop();
266            super.doStop();
267        }
268        
269        public Server getServer() {
270            return server;
271        }
272        
273    }