001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.cxf;
018    
019    import java.io.InputStream;
020    import java.lang.reflect.Method;
021    import java.util.ArrayList;
022    import java.util.HashMap;
023    import java.util.HashSet;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Set;
027    
028    import javax.activation.DataHandler;
029    import javax.xml.namespace.QName;
030    
031    import org.w3c.dom.Element;
032    
033    import org.apache.camel.Exchange;
034    import org.apache.camel.ExchangePattern;
035    import org.apache.camel.spi.HeaderFilterStrategy;
036    import org.apache.camel.spi.HeaderFilterStrategyAware;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    import org.apache.cxf.attachment.AttachmentImpl;
040    import org.apache.cxf.binding.soap.SoapHeader;
041    import org.apache.cxf.endpoint.Client;
042    import org.apache.cxf.endpoint.Endpoint;
043    import org.apache.cxf.frontend.MethodDispatcher;
044    import org.apache.cxf.headers.Header;
045    import org.apache.cxf.helpers.CastUtils;
046    import org.apache.cxf.jaxws.context.WrappedMessageContext;
047    import org.apache.cxf.message.Attachment;
048    import org.apache.cxf.message.Message;
049    import org.apache.cxf.message.MessageContentsList;
050    import org.apache.cxf.service.Service;
051    import org.apache.cxf.service.model.BindingMessageInfo;
052    import org.apache.cxf.service.model.BindingOperationInfo;
053    
054    /**
055     * The Default CXF binding implementation.
056     * 
057     * @version $Revision: 17638 $
058     */
059    public class DefaultCxfBinding implements CxfBinding, HeaderFilterStrategyAware {
060        private static final Log LOG = LogFactory.getLog(DefaultCxfBinding.class);
061        private HeaderFilterStrategy headerFilterStrategy;
062    
063        // CxfBinding Methods
064        // -------------------------------------------------------------------------
065        
066        /**
067         * <p>
068         * This method is called by {@link CxfProducer#process(Exchange)}. It populates 
069         * the CXF exchange and invocation context (i.e. request/response) contexts, it 
070         * but does not create and populate a CXF message as a ClientImpl's invoke method
071         * will create a new CXF Message.  That method will put all properties from the 
072         * CXF exchange and request context to the CXF message.
073         * </p>
074         */
075        public void populateCxfRequestFromExchange(
076                org.apache.cxf.message.Exchange cxfExchange, Exchange camelExchange,
077                Map<String, Object> requestContext) {
078                   
079            // propagate request context
080            Map<String, Object> camelHeaders = camelExchange.getIn().getHeaders();
081            extractInvocationContextFromCamel(camelExchange, camelHeaders,
082                    requestContext, Client.REQUEST_CONTEXT);
083                    
084            // propagate headers
085            propagateHeadersFromCamelToCxf(camelExchange, camelHeaders, cxfExchange, 
086                    requestContext);
087            
088            // propagate attachments
089            Set<Attachment> attachments = null;
090            for (Map.Entry<String, DataHandler> entry : camelExchange.getIn().getAttachments().entrySet()) {
091                if (attachments == null) {
092                    attachments = new HashSet<Attachment>();
093                }
094                AttachmentImpl attachment = new AttachmentImpl(entry.getKey(), entry.getValue());
095                attachment.setXOP(true); // only supports MTOM
096                attachments.add(attachment);
097            }
098            
099            if (attachments != null) {
100                requestContext.put(CxfConstants.CAMEL_CXF_ATTACHMENTS, attachments);
101            }
102        }
103        
104        /**
105         * This method is called by {@link CxfProducer#process(Exchange)}.  It propagates 
106         * information from CXF Exchange to Camel Exchange.  The CXF Exchange contains a 
107         * request from a CXF server.
108         */
109        public void populateExchangeFromCxfResponse(Exchange camelExchange,
110                org.apache.cxf.message.Exchange cxfExchange, 
111                Map<String, Object> responseContext) {
112          
113            Message cxfMessage = cxfExchange.getInMessage();
114            
115            if (LOG.isTraceEnabled()) {
116                LOG.trace("Populate exchange from CXF response message: " + cxfMessage);
117            }
118            
119            // propagate body
120            camelExchange.getOut().setBody(DefaultCxfBinding.getContentFromCxf(cxfMessage, 
121                    camelExchange.getProperty(CxfConstants.DATA_FORMAT_PROPERTY, DataFormat.class)));
122            
123            // propagate response context
124            if (responseContext != null && responseContext.size() > 0) {
125                if (!headerFilterStrategy.applyFilterToExternalHeaders(Client.RESPONSE_CONTEXT, 
126                                                                       responseContext, camelExchange)) {        
127                    camelExchange.getOut().setHeader(Client.RESPONSE_CONTEXT, responseContext);
128                    if (LOG.isTraceEnabled()) {
129                        LOG.trace("Set header = " + Client.RESPONSE_CONTEXT + " value = " 
130                                  + responseContext);
131                    }
132                }
133            }
134            
135            // propagate protocol headers
136            propagateHeadersFromCxfToCamel(cxfMessage, camelExchange.getOut(), camelExchange);
137            
138            if (cxfMessage.getAttachments() != null) {
139                // TODO: workaround for CXF-2503
140                try {
141                    cxfMessage.getAttachments().size();
142                } catch (java.util.ConcurrentModificationException e) {
143                    // ignore
144                }
145                // end of workaround
146    
147                // propagate attachments
148                for (Attachment attachment : cxfMessage.getAttachments()) {
149                    camelExchange.getOut().addAttachment(attachment.getId(), attachment.getDataHandler());
150                }        
151            }
152        }
153        
154        /**
155         * This method is called by {@link CxfConsumer}.
156         */
157        public void populateExchangeFromCxfRequest(org.apache.cxf.message.Exchange cxfExchange,
158                Exchange camelExchange) {
159            
160            Method method = null;
161            QName operationName = null;
162            ExchangePattern mep = ExchangePattern.InOut;
163            
164            // extract binding operation information
165            BindingOperationInfo boi = camelExchange.getProperty(BindingOperationInfo.class.getName(), 
166                                                                 BindingOperationInfo.class);
167            if (boi != null) {
168                Service service = (Service)cxfExchange.get(Service.class); 
169                if (service != null) {
170                    MethodDispatcher md = (MethodDispatcher)service
171                        .get(MethodDispatcher.class.getName());
172                    if (md != null) {
173                        method = md.getMethod(boi);
174                    }
175                }
176                
177                if (boi.getOperationInfo().isOneWay()) {
178                    mep = ExchangePattern.InOnly;
179                }
180                    
181                operationName = boi.getName();
182            }
183            
184            // set operation name in header
185            if (operationName != null) {
186                camelExchange.getIn().setHeader(CxfConstants.OPERATION_NAMESPACE, 
187                        boi.getName().getNamespaceURI());
188                camelExchange.getIn().setHeader(CxfConstants.OPERATION_NAME, 
189                        boi.getName().getLocalPart());
190                if (LOG.isTraceEnabled()) {
191                    LOG.trace("Set IN header: " + CxfConstants.OPERATION_NAMESPACE + "=" 
192                             + boi.getName().getNamespaceURI());
193                    LOG.trace("Set IN header: " + CxfConstants.OPERATION_NAME + "=" 
194                            + boi.getName().getLocalPart());
195                }
196            } else if (method != null) {
197                camelExchange.getIn().setHeader(CxfConstants.OPERATION_NAME, method.getName());
198                if (LOG.isTraceEnabled()) {
199                    LOG.trace("Set IN header: " + CxfConstants.OPERATION_NAME + "=" 
200                            + method.getName());
201                }
202            }
203                    
204            // set message exchange pattern
205            camelExchange.setPattern(mep);
206            if (LOG.isTraceEnabled()) {
207                LOG.trace("Set exchange MEP: " + mep);
208            }
209    
210            // propagate headers
211            Message cxfMessage = cxfExchange.getInMessage();
212            propagateHeadersFromCxfToCamel(cxfMessage, camelExchange.getIn(), camelExchange);
213            
214            // Propagating properties from CXF Exchange to Camel Exchange has an  
215            // side effect of copying reply side stuff when the producer is retried.
216            // So, we do not want to do this.
217            //camelExchange.getProperties().putAll(cxfExchange);
218            
219            // propagate request context
220            Object value = cxfMessage.get(Client.REQUEST_CONTEXT);
221            if (value != null && !headerFilterStrategy.applyFilterToExternalHeaders(
222                    Client.REQUEST_CONTEXT, value, camelExchange)) {
223                camelExchange.getIn().setHeader(Client.REQUEST_CONTEXT, value);
224                if (LOG.isTraceEnabled()) {
225                    LOG.trace("Populate context from CXF message " + Client.REQUEST_CONTEXT 
226                            + " value=" + value);
227                }
228            }
229               
230            // set body
231            Object body = DefaultCxfBinding.getContentFromCxf(cxfMessage, 
232                    camelExchange.getProperty(CxfConstants.DATA_FORMAT_PROPERTY, DataFormat.class));
233            if (body != null) {
234                camelExchange.getIn().setBody(body);
235            }  
236            
237            // propagate attachments
238            if (cxfMessage.getAttachments() != null) {
239                for (Attachment attachment : cxfMessage.getAttachments()) {
240                    camelExchange.getIn().addAttachment(attachment.getId(), attachment.getDataHandler());
241                }
242            }
243        }
244    
245        /**
246         * This method is called by {@link CxfConsumer} to populate a CXF response exchange 
247         * from a Camel exchange.
248         */
249        public void populateCxfResponseFromExchange(Exchange camelExchange, 
250                org.apache.cxf.message.Exchange cxfExchange) {
251            
252            // create response context
253            Map<String, Object> responseContext = new HashMap<String, Object>();
254            
255            // propagate response context
256            Map<String, Object> camelHeaders = camelExchange.getOut().getHeaders();
257            extractInvocationContextFromCamel(camelExchange, camelHeaders, 
258                    responseContext, Client.RESPONSE_CONTEXT);
259            
260            propagateHeadersFromCamelToCxf(camelExchange, camelHeaders, cxfExchange, 
261                    responseContext);
262            // create out message
263            Endpoint ep = cxfExchange.get(Endpoint.class);
264            Message outMessage = ep.getBinding().createMessage();
265            cxfExchange.setOutMessage(outMessage);       
266    
267            DataFormat dataFormat = camelExchange.getProperty(CxfConstants.DATA_FORMAT_PROPERTY,  
268                    DataFormat.class);
269            
270            // propagate contexts
271            if (dataFormat != DataFormat.POJO) {
272                // copying response context to out message seems to cause problem in POJO mode
273                outMessage.putAll(responseContext);
274            }
275            outMessage.put(Client.RESPONSE_CONTEXT, responseContext);      
276            
277            if (LOG.isTraceEnabled()) {
278                LOG.trace("Set out response context = " + responseContext);
279            }
280            
281            // set body
282            Object outBody = DefaultCxfBinding.getBodyFromCamel(camelExchange.getOut(), dataFormat);
283            
284            if (outBody != null) {
285                if (dataFormat == DataFormat.PAYLOAD) {
286                    CxfPayload<?> payload = (CxfPayload<?>)outBody;
287                    outMessage.put(List.class, payload.getBody());
288                    outMessage.put(Header.HEADER_LIST, payload.getHeaders());
289                } else {
290                    if (responseContext.get(Header.HEADER_LIST) != null) {
291                        outMessage.put(Header.HEADER_LIST, responseContext.get(Header.HEADER_LIST));
292                    }
293    
294                    MessageContentsList resList = null;
295                    if (outBody instanceof MessageContentsList) {
296                        resList = (MessageContentsList)outBody;
297                    } else if (outBody instanceof List) {
298                        resList = new MessageContentsList((List<?>)outBody);
299                    } else if (outBody.getClass().isArray()) {
300                        resList = new MessageContentsList((Object[])outBody);
301                    } else {
302                        resList = new MessageContentsList(outBody);
303                    }
304    
305                    if (resList != null) {
306                        outMessage.setContent(List.class, resList);
307                        if (LOG.isTraceEnabled()) {
308                            LOG.trace("Set Out CXF message content = " + resList);
309                        }
310                    }
311                }
312            }         
313            
314            // propagate attachments
315            Set<Attachment> attachments = null;
316            for (Map.Entry<String, DataHandler> entry : camelExchange.getOut().getAttachments().entrySet()) {
317                if (attachments == null) {
318                    attachments = new HashSet<Attachment>();
319                }
320                AttachmentImpl attachment = new AttachmentImpl(entry.getKey(), entry.getValue());
321                attachment.setXOP(true); // only supports MTOM
322                attachments.add(attachment);
323            }
324            
325            if (attachments != null) {
326                outMessage.setAttachments(attachments);
327            }
328           
329            BindingOperationInfo boi = cxfExchange.get(BindingOperationInfo.class);
330            if (boi != null) {
331                cxfExchange.put(BindingMessageInfo.class, boi.getOutput());
332            }
333            
334        }
335    
336        
337        // HeaderFilterStrategyAware Methods
338        // -------------------------------------------------------------------------
339        
340    
341        public HeaderFilterStrategy getHeaderFilterStrategy() {
342            return headerFilterStrategy;
343        }
344    
345        public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
346            this.headerFilterStrategy = strategy;
347        }
348    
349           
350        // Non public methods
351        // -------------------------------------------------------------------------
352        
353        /**
354         * @param camelExchange
355         * @param cxfContext Request or Response context
356         * @param camelHeaders 
357         * @param contextKey 
358         */
359        @SuppressWarnings("unchecked")
360        protected void extractInvocationContextFromCamel(Exchange camelExchange,
361                Map<String, Object> camelHeaders, Map<String, Object> cxfContext,
362                String contextKey) {
363            
364            // extract from header
365            Map context = (Map)camelHeaders.get(contextKey);
366            if (context != null) {
367                cxfContext.putAll(context);
368                if (LOG.isTraceEnabled()) {
369                    LOG.trace("Propagate " + contextKey + " from header context = " 
370                            + ((context instanceof WrappedMessageContext) 
371                                    ? ((WrappedMessageContext)context).getWrappedMap() 
372                                            : context));
373                }
374            }
375            
376            // extract from exchange property
377            context = (Map)camelExchange.getProperty(contextKey);
378            if (context != null) {
379                cxfContext.putAll(context);
380                if (LOG.isTraceEnabled()) {
381                    LOG.trace("Propagate " + contextKey + " from exchange property context = " 
382                            + ((context instanceof WrappedMessageContext) 
383                                    ? ((WrappedMessageContext)context).getWrappedMap() 
384                                            : context));
385                }
386            }
387            
388            // copy camel exchange properties into context
389            if (camelExchange.getProperties() != null) {
390                cxfContext.putAll(camelExchange.getProperties());
391            }
392            
393            camelExchange.setProperty(contextKey, cxfContext);
394        }
395    
396        /**
397         * @param cxfMessage
398         * @param camelMessage
399         * @param exchange provides context for filtering
400         */
401        protected void propagateHeadersFromCxfToCamel(Message cxfMessage,
402                org.apache.camel.Message camelMessage, Exchange exchange) {
403            
404            Map<String, List<String>> cxfHeaders = (Map)cxfMessage.get(Message.PROTOCOL_HEADERS);
405            Map<String, Object> camelHeaders = camelMessage.getHeaders();
406    
407            if (cxfHeaders != null) {
408                for (Map.Entry<String, List<String>> entry : cxfHeaders.entrySet()) {
409                    if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), 
410                                                                           entry.getValue(), exchange)) {
411                        camelHeaders.put(entry.getKey(), entry.getValue().get(0));
412                        if (LOG.isTraceEnabled()) {
413                            LOG.trace("Populate header from CXF header=" + entry.getKey() + " value="
414                                    + entry.getValue());
415                        }
416                    }
417                }
418            }
419            
420            
421            // propagate SOAP/protocol header list
422            String key = Header.HEADER_LIST;
423            Object value = cxfMessage.get(key);
424            if (value != null) {
425                if (!headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) {
426                    camelHeaders.put(key, value);
427                    if (LOG.isTraceEnabled()) {
428                        LOG.trace("Populate header from CXF header=" + key + " value=" + value);
429                    }
430                } else {
431                    ((List<?>)value).clear();
432                }
433            }
434        }
435    
436        protected void propagateHeadersFromCamelToCxf(Exchange camelExchange, 
437                Map<String, Object> camelHeaders,
438                org.apache.cxf.message.Exchange cxfExchange, 
439                Map<String, Object> cxfContext) {
440            
441            // get cxf transport headers (if any) from camel exchange
442            Map<String, List<String>> transportHeaders = new HashMap<String, List<String>>();
443            if (camelExchange != null) {
444                Map<String, List<String>> h = (Map)camelExchange.getProperty(Message.PROTOCOL_HEADERS);
445                if (h != null) {
446                    transportHeaders.putAll(h);
447                }
448            }
449            Map<String, List<String>> headers = (Map)camelHeaders.get(Message.PROTOCOL_HEADERS);
450            if (headers != null) {
451                transportHeaders.putAll(headers);
452            }
453                
454            for (Map.Entry<String, Object> entry : camelHeaders.entrySet()) {    
455                // this header should be filtered, continue to the next header
456                if (headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), camelExchange)) {
457                    continue;
458                }
459                
460                if (LOG.isTraceEnabled()) {
461                    LOG.trace("Propagate to CXF header: " + entry.getKey() + " value: " + entry.getValue());
462                }
463                
464                // put response code in request context so it will be copied to CXF message's property
465                if (Message.RESPONSE_CODE.equals(entry.getKey())) {
466                    cxfContext.put(entry.getKey(), entry.getValue());
467                    continue;
468                }
469                
470                // put SOAP/protocol header list in exchange
471                if (Header.HEADER_LIST.equals(entry.getKey())) {
472                    List<Header> headerList = (List<Header>)entry.getValue();
473                    for (Header header : headerList) {
474                        header.setDirection(Header.Direction.DIRECTION_OUT);
475                        if (LOG.isTraceEnabled()) {
476                            LOG.trace("Propagate SOAP/protocol header: " + header.getName() + " : " + header.getObject());
477                        }
478                    }
479                    
480                    //cxfExchange.put(Header.HEADER_LIST, headerList);
481                    cxfContext.put(entry.getKey(), headerList);
482                    continue;
483                }
484                
485                // things that are not filtered and not specifically copied will be put in transport headers
486                if (entry.getValue() instanceof List) {
487                    transportHeaders.put(entry.getKey(), (List<String>)entry.getValue());
488                } else {
489                    List<String> listValue = new ArrayList<String>();
490                    listValue.add(entry.getValue().toString());
491                    transportHeaders.put(entry.getKey(), listValue);
492                }
493                
494            }
495            
496            if (transportHeaders.size() > 0) {
497                cxfContext.put(Message.PROTOCOL_HEADERS, transportHeaders);
498            }        
499        }
500    
501        protected static Object getContentFromCxf(Message message, DataFormat dataFormat) {
502            Set<Class<?>> contentFormats = message.getContentFormats();
503            Object answer = null;
504            if (contentFormats != null) {
505                
506                if (LOG.isTraceEnabled()) {
507                    for (Class<?> contentFormat : contentFormats) {
508                        LOG.trace("Content format=" + contentFormat + " value=" 
509                                + message.getContent(contentFormat));
510                    }
511                }
512                
513                if (dataFormat == DataFormat.POJO) {
514                    answer = message.getContent(List.class);  
515                    if (answer == null) {
516                        answer = message.getContent(Object.class);
517                        if (answer != null) {
518                            answer = new MessageContentsList(answer);
519                        }
520                    }
521                } else if (dataFormat == DataFormat.PAYLOAD) {
522                    // TODO handle other message types in the future.  Currently, this binding only 
523                    // deal with SOAP in PayLoad mode.
524                    List<Element> body = message.get(List.class);
525                    List<SoapHeader> headers = CastUtils.cast((List<?>)message.get(Header.HEADER_LIST));
526                    answer = new CxfPayload<SoapHeader>(headers, body);
527                } else if (dataFormat == DataFormat.MESSAGE) {
528                    answer = message.getContent(InputStream.class);
529                }
530    
531                if (LOG.isTraceEnabled()) {
532                    LOG.trace("Extracted body from CXF message = " + answer);
533                }
534            }
535            return answer;
536        }
537        
538        public static Object getBodyFromCamel(org.apache.camel.Message out,
539                DataFormat dataFormat) {
540            Object answer = null;
541            
542            if (dataFormat == DataFormat.POJO) {
543                answer = out.getBody();
544            } else if (dataFormat == DataFormat.PAYLOAD) {
545                answer = out.getBody(CxfPayload.class);
546            } else if (dataFormat == DataFormat.MESSAGE) {
547                answer = out.getBody(InputStream.class);
548            }
549            return answer;
550        }
551    
552        public void copyJaxWsContext(org.apache.cxf.message.Exchange cxfExchange, Map<String, Object> context) {
553            if (cxfExchange.getOutMessage() != null) {
554                org.apache.cxf.message.Message outMessage = cxfExchange.getOutMessage();
555                for (Map.Entry<String, Object> entry : context.entrySet()) {
556                    if (outMessage.get(entry.getKey()) == null) {
557                        outMessage.put(entry.getKey(), entry.getValue());
558                    }
559                }
560            }
561        }
562    
563        public void extractJaxWsContext(org.apache.cxf.message.Exchange cxfExchange, Map<String, Object> context) {
564            org.apache.cxf.message.Message inMessage = cxfExchange.getInMessage();
565            for (Map.Entry<String, Object> entry : inMessage.entrySet()) {
566                if (entry.getKey().startsWith("javax.xml.ws")) {
567                    context.put(entry.getKey(), entry.getValue());
568                }
569            }
570            
571        }
572    
573    }