001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.cxf;
018
019 import java.io.InputStream;
020 import java.lang.reflect.Method;
021 import java.util.ArrayList;
022 import java.util.HashMap;
023 import java.util.HashSet;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Set;
027
028 import javax.activation.DataHandler;
029 import javax.xml.namespace.QName;
030
031 import org.w3c.dom.Element;
032
033 import org.apache.camel.Exchange;
034 import org.apache.camel.ExchangePattern;
035 import org.apache.camel.spi.HeaderFilterStrategy;
036 import org.apache.camel.spi.HeaderFilterStrategyAware;
037 import org.apache.commons.logging.Log;
038 import org.apache.commons.logging.LogFactory;
039 import org.apache.cxf.attachment.AttachmentImpl;
040 import org.apache.cxf.binding.soap.SoapHeader;
041 import org.apache.cxf.endpoint.Client;
042 import org.apache.cxf.endpoint.Endpoint;
043 import org.apache.cxf.frontend.MethodDispatcher;
044 import org.apache.cxf.headers.Header;
045 import org.apache.cxf.helpers.CastUtils;
046 import org.apache.cxf.jaxws.context.WrappedMessageContext;
047 import org.apache.cxf.message.Attachment;
048 import org.apache.cxf.message.Message;
049 import org.apache.cxf.message.MessageContentsList;
050 import org.apache.cxf.service.Service;
051 import org.apache.cxf.service.model.BindingMessageInfo;
052 import org.apache.cxf.service.model.BindingOperationInfo;
053
054 /**
055 * The Default CXF binding implementation.
056 *
057 * @version $Revision: 17638 $
058 */
059 public class DefaultCxfBinding implements CxfBinding, HeaderFilterStrategyAware {
060 private static final Log LOG = LogFactory.getLog(DefaultCxfBinding.class);
061 private HeaderFilterStrategy headerFilterStrategy;
062
063 // CxfBinding Methods
064 // -------------------------------------------------------------------------
065
066 /**
067 * <p>
068 * This method is called by {@link CxfProducer#process(Exchange)}. It populates
069 * the CXF exchange and invocation context (i.e. request/response) contexts, it
070 * but does not create and populate a CXF message as a ClientImpl's invoke method
071 * will create a new CXF Message. That method will put all properties from the
072 * CXF exchange and request context to the CXF message.
073 * </p>
074 */
075 public void populateCxfRequestFromExchange(
076 org.apache.cxf.message.Exchange cxfExchange, Exchange camelExchange,
077 Map<String, Object> requestContext) {
078
079 // propagate request context
080 Map<String, Object> camelHeaders = camelExchange.getIn().getHeaders();
081 extractInvocationContextFromCamel(camelExchange, camelHeaders,
082 requestContext, Client.REQUEST_CONTEXT);
083
084 // propagate headers
085 propagateHeadersFromCamelToCxf(camelExchange, camelHeaders, cxfExchange,
086 requestContext);
087
088 // propagate attachments
089 Set<Attachment> attachments = null;
090 for (Map.Entry<String, DataHandler> entry : camelExchange.getIn().getAttachments().entrySet()) {
091 if (attachments == null) {
092 attachments = new HashSet<Attachment>();
093 }
094 AttachmentImpl attachment = new AttachmentImpl(entry.getKey(), entry.getValue());
095 attachment.setXOP(true); // only supports MTOM
096 attachments.add(attachment);
097 }
098
099 if (attachments != null) {
100 requestContext.put(CxfConstants.CAMEL_CXF_ATTACHMENTS, attachments);
101 }
102 }
103
104 /**
105 * This method is called by {@link CxfProducer#process(Exchange)}. It propagates
106 * information from CXF Exchange to Camel Exchange. The CXF Exchange contains a
107 * request from a CXF server.
108 */
109 public void populateExchangeFromCxfResponse(Exchange camelExchange,
110 org.apache.cxf.message.Exchange cxfExchange,
111 Map<String, Object> responseContext) {
112
113 Message cxfMessage = cxfExchange.getInMessage();
114
115 if (LOG.isTraceEnabled()) {
116 LOG.trace("Populate exchange from CXF response message: " + cxfMessage);
117 }
118
119 // propagate body
120 camelExchange.getOut().setBody(DefaultCxfBinding.getContentFromCxf(cxfMessage,
121 camelExchange.getProperty(CxfConstants.DATA_FORMAT_PROPERTY, DataFormat.class)));
122
123 // propagate response context
124 if (responseContext != null && responseContext.size() > 0) {
125 if (!headerFilterStrategy.applyFilterToExternalHeaders(Client.RESPONSE_CONTEXT,
126 responseContext, camelExchange)) {
127 camelExchange.getOut().setHeader(Client.RESPONSE_CONTEXT, responseContext);
128 if (LOG.isTraceEnabled()) {
129 LOG.trace("Set header = " + Client.RESPONSE_CONTEXT + " value = "
130 + responseContext);
131 }
132 }
133 }
134
135 // propagate protocol headers
136 propagateHeadersFromCxfToCamel(cxfMessage, camelExchange.getOut(), camelExchange);
137
138 if (cxfMessage.getAttachments() != null) {
139 // TODO: workaround for CXF-2503
140 try {
141 cxfMessage.getAttachments().size();
142 } catch (java.util.ConcurrentModificationException e) {
143 // ignore
144 }
145 // end of workaround
146
147 // propagate attachments
148 for (Attachment attachment : cxfMessage.getAttachments()) {
149 camelExchange.getOut().addAttachment(attachment.getId(), attachment.getDataHandler());
150 }
151 }
152 }
153
154 /**
155 * This method is called by {@link CxfConsumer}.
156 */
157 public void populateExchangeFromCxfRequest(org.apache.cxf.message.Exchange cxfExchange,
158 Exchange camelExchange) {
159
160 Method method = null;
161 QName operationName = null;
162 ExchangePattern mep = ExchangePattern.InOut;
163
164 // extract binding operation information
165 BindingOperationInfo boi = camelExchange.getProperty(BindingOperationInfo.class.getName(),
166 BindingOperationInfo.class);
167 if (boi != null) {
168 Service service = (Service)cxfExchange.get(Service.class);
169 if (service != null) {
170 MethodDispatcher md = (MethodDispatcher)service
171 .get(MethodDispatcher.class.getName());
172 if (md != null) {
173 method = md.getMethod(boi);
174 }
175 }
176
177 if (boi.getOperationInfo().isOneWay()) {
178 mep = ExchangePattern.InOnly;
179 }
180
181 operationName = boi.getName();
182 }
183
184 // set operation name in header
185 if (operationName != null) {
186 camelExchange.getIn().setHeader(CxfConstants.OPERATION_NAMESPACE,
187 boi.getName().getNamespaceURI());
188 camelExchange.getIn().setHeader(CxfConstants.OPERATION_NAME,
189 boi.getName().getLocalPart());
190 if (LOG.isTraceEnabled()) {
191 LOG.trace("Set IN header: " + CxfConstants.OPERATION_NAMESPACE + "="
192 + boi.getName().getNamespaceURI());
193 LOG.trace("Set IN header: " + CxfConstants.OPERATION_NAME + "="
194 + boi.getName().getLocalPart());
195 }
196 } else if (method != null) {
197 camelExchange.getIn().setHeader(CxfConstants.OPERATION_NAME, method.getName());
198 if (LOG.isTraceEnabled()) {
199 LOG.trace("Set IN header: " + CxfConstants.OPERATION_NAME + "="
200 + method.getName());
201 }
202 }
203
204 // set message exchange pattern
205 camelExchange.setPattern(mep);
206 if (LOG.isTraceEnabled()) {
207 LOG.trace("Set exchange MEP: " + mep);
208 }
209
210 // propagate headers
211 Message cxfMessage = cxfExchange.getInMessage();
212 propagateHeadersFromCxfToCamel(cxfMessage, camelExchange.getIn(), camelExchange);
213
214 // Propagating properties from CXF Exchange to Camel Exchange has an
215 // side effect of copying reply side stuff when the producer is retried.
216 // So, we do not want to do this.
217 //camelExchange.getProperties().putAll(cxfExchange);
218
219 // propagate request context
220 Object value = cxfMessage.get(Client.REQUEST_CONTEXT);
221 if (value != null && !headerFilterStrategy.applyFilterToExternalHeaders(
222 Client.REQUEST_CONTEXT, value, camelExchange)) {
223 camelExchange.getIn().setHeader(Client.REQUEST_CONTEXT, value);
224 if (LOG.isTraceEnabled()) {
225 LOG.trace("Populate context from CXF message " + Client.REQUEST_CONTEXT
226 + " value=" + value);
227 }
228 }
229
230 // set body
231 Object body = DefaultCxfBinding.getContentFromCxf(cxfMessage,
232 camelExchange.getProperty(CxfConstants.DATA_FORMAT_PROPERTY, DataFormat.class));
233 if (body != null) {
234 camelExchange.getIn().setBody(body);
235 }
236
237 // propagate attachments
238 if (cxfMessage.getAttachments() != null) {
239 for (Attachment attachment : cxfMessage.getAttachments()) {
240 camelExchange.getIn().addAttachment(attachment.getId(), attachment.getDataHandler());
241 }
242 }
243 }
244
245 /**
246 * This method is called by {@link CxfConsumer} to populate a CXF response exchange
247 * from a Camel exchange.
248 */
249 public void populateCxfResponseFromExchange(Exchange camelExchange,
250 org.apache.cxf.message.Exchange cxfExchange) {
251
252 // create response context
253 Map<String, Object> responseContext = new HashMap<String, Object>();
254
255 // propagate response context
256 Map<String, Object> camelHeaders = camelExchange.getOut().getHeaders();
257 extractInvocationContextFromCamel(camelExchange, camelHeaders,
258 responseContext, Client.RESPONSE_CONTEXT);
259
260 propagateHeadersFromCamelToCxf(camelExchange, camelHeaders, cxfExchange,
261 responseContext);
262 // create out message
263 Endpoint ep = cxfExchange.get(Endpoint.class);
264 Message outMessage = ep.getBinding().createMessage();
265 cxfExchange.setOutMessage(outMessage);
266
267 DataFormat dataFormat = camelExchange.getProperty(CxfConstants.DATA_FORMAT_PROPERTY,
268 DataFormat.class);
269
270 // propagate contexts
271 if (dataFormat != DataFormat.POJO) {
272 // copying response context to out message seems to cause problem in POJO mode
273 outMessage.putAll(responseContext);
274 }
275 outMessage.put(Client.RESPONSE_CONTEXT, responseContext);
276
277 if (LOG.isTraceEnabled()) {
278 LOG.trace("Set out response context = " + responseContext);
279 }
280
281 // set body
282 Object outBody = DefaultCxfBinding.getBodyFromCamel(camelExchange.getOut(), dataFormat);
283
284 if (outBody != null) {
285 if (dataFormat == DataFormat.PAYLOAD) {
286 CxfPayload<?> payload = (CxfPayload<?>)outBody;
287 outMessage.put(List.class, payload.getBody());
288 outMessage.put(Header.HEADER_LIST, payload.getHeaders());
289 } else {
290 if (responseContext.get(Header.HEADER_LIST) != null) {
291 outMessage.put(Header.HEADER_LIST, responseContext.get(Header.HEADER_LIST));
292 }
293
294 MessageContentsList resList = null;
295 if (outBody instanceof MessageContentsList) {
296 resList = (MessageContentsList)outBody;
297 } else if (outBody instanceof List) {
298 resList = new MessageContentsList((List<?>)outBody);
299 } else if (outBody.getClass().isArray()) {
300 resList = new MessageContentsList((Object[])outBody);
301 } else {
302 resList = new MessageContentsList(outBody);
303 }
304
305 if (resList != null) {
306 outMessage.setContent(List.class, resList);
307 if (LOG.isTraceEnabled()) {
308 LOG.trace("Set Out CXF message content = " + resList);
309 }
310 }
311 }
312 }
313
314 // propagate attachments
315 Set<Attachment> attachments = null;
316 for (Map.Entry<String, DataHandler> entry : camelExchange.getOut().getAttachments().entrySet()) {
317 if (attachments == null) {
318 attachments = new HashSet<Attachment>();
319 }
320 AttachmentImpl attachment = new AttachmentImpl(entry.getKey(), entry.getValue());
321 attachment.setXOP(true); // only supports MTOM
322 attachments.add(attachment);
323 }
324
325 if (attachments != null) {
326 outMessage.setAttachments(attachments);
327 }
328
329 BindingOperationInfo boi = cxfExchange.get(BindingOperationInfo.class);
330 if (boi != null) {
331 cxfExchange.put(BindingMessageInfo.class, boi.getOutput());
332 }
333
334 }
335
336
337 // HeaderFilterStrategyAware Methods
338 // -------------------------------------------------------------------------
339
340
341 public HeaderFilterStrategy getHeaderFilterStrategy() {
342 return headerFilterStrategy;
343 }
344
345 public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
346 this.headerFilterStrategy = strategy;
347 }
348
349
350 // Non public methods
351 // -------------------------------------------------------------------------
352
353 /**
354 * @param camelExchange
355 * @param cxfContext Request or Response context
356 * @param camelHeaders
357 * @param contextKey
358 */
359 @SuppressWarnings("unchecked")
360 protected void extractInvocationContextFromCamel(Exchange camelExchange,
361 Map<String, Object> camelHeaders, Map<String, Object> cxfContext,
362 String contextKey) {
363
364 // extract from header
365 Map context = (Map)camelHeaders.get(contextKey);
366 if (context != null) {
367 cxfContext.putAll(context);
368 if (LOG.isTraceEnabled()) {
369 LOG.trace("Propagate " + contextKey + " from header context = "
370 + ((context instanceof WrappedMessageContext)
371 ? ((WrappedMessageContext)context).getWrappedMap()
372 : context));
373 }
374 }
375
376 // extract from exchange property
377 context = (Map)camelExchange.getProperty(contextKey);
378 if (context != null) {
379 cxfContext.putAll(context);
380 if (LOG.isTraceEnabled()) {
381 LOG.trace("Propagate " + contextKey + " from exchange property context = "
382 + ((context instanceof WrappedMessageContext)
383 ? ((WrappedMessageContext)context).getWrappedMap()
384 : context));
385 }
386 }
387
388 // copy camel exchange properties into context
389 if (camelExchange.getProperties() != null) {
390 cxfContext.putAll(camelExchange.getProperties());
391 }
392
393 camelExchange.setProperty(contextKey, cxfContext);
394 }
395
396 /**
397 * @param cxfMessage
398 * @param camelMessage
399 * @param exchange provides context for filtering
400 */
401 protected void propagateHeadersFromCxfToCamel(Message cxfMessage,
402 org.apache.camel.Message camelMessage, Exchange exchange) {
403
404 Map<String, List<String>> cxfHeaders = (Map)cxfMessage.get(Message.PROTOCOL_HEADERS);
405 Map<String, Object> camelHeaders = camelMessage.getHeaders();
406
407 if (cxfHeaders != null) {
408 for (Map.Entry<String, List<String>> entry : cxfHeaders.entrySet()) {
409 if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(),
410 entry.getValue(), exchange)) {
411 camelHeaders.put(entry.getKey(), entry.getValue().get(0));
412 if (LOG.isTraceEnabled()) {
413 LOG.trace("Populate header from CXF header=" + entry.getKey() + " value="
414 + entry.getValue());
415 }
416 }
417 }
418 }
419
420
421 // propagate SOAP/protocol header list
422 String key = Header.HEADER_LIST;
423 Object value = cxfMessage.get(key);
424 if (value != null) {
425 if (!headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) {
426 camelHeaders.put(key, value);
427 if (LOG.isTraceEnabled()) {
428 LOG.trace("Populate header from CXF header=" + key + " value=" + value);
429 }
430 } else {
431 ((List<?>)value).clear();
432 }
433 }
434 }
435
436 protected void propagateHeadersFromCamelToCxf(Exchange camelExchange,
437 Map<String, Object> camelHeaders,
438 org.apache.cxf.message.Exchange cxfExchange,
439 Map<String, Object> cxfContext) {
440
441 // get cxf transport headers (if any) from camel exchange
442 Map<String, List<String>> transportHeaders = new HashMap<String, List<String>>();
443 if (camelExchange != null) {
444 Map<String, List<String>> h = (Map)camelExchange.getProperty(Message.PROTOCOL_HEADERS);
445 if (h != null) {
446 transportHeaders.putAll(h);
447 }
448 }
449 Map<String, List<String>> headers = (Map)camelHeaders.get(Message.PROTOCOL_HEADERS);
450 if (headers != null) {
451 transportHeaders.putAll(headers);
452 }
453
454 for (Map.Entry<String, Object> entry : camelHeaders.entrySet()) {
455 // this header should be filtered, continue to the next header
456 if (headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), camelExchange)) {
457 continue;
458 }
459
460 if (LOG.isTraceEnabled()) {
461 LOG.trace("Propagate to CXF header: " + entry.getKey() + " value: " + entry.getValue());
462 }
463
464 // put response code in request context so it will be copied to CXF message's property
465 if (Message.RESPONSE_CODE.equals(entry.getKey())) {
466 cxfContext.put(entry.getKey(), entry.getValue());
467 continue;
468 }
469
470 // put SOAP/protocol header list in exchange
471 if (Header.HEADER_LIST.equals(entry.getKey())) {
472 List<Header> headerList = (List<Header>)entry.getValue();
473 for (Header header : headerList) {
474 header.setDirection(Header.Direction.DIRECTION_OUT);
475 if (LOG.isTraceEnabled()) {
476 LOG.trace("Propagate SOAP/protocol header: " + header.getName() + " : " + header.getObject());
477 }
478 }
479
480 //cxfExchange.put(Header.HEADER_LIST, headerList);
481 cxfContext.put(entry.getKey(), headerList);
482 continue;
483 }
484
485 // things that are not filtered and not specifically copied will be put in transport headers
486 if (entry.getValue() instanceof List) {
487 transportHeaders.put(entry.getKey(), (List<String>)entry.getValue());
488 } else {
489 List<String> listValue = new ArrayList<String>();
490 listValue.add(entry.getValue().toString());
491 transportHeaders.put(entry.getKey(), listValue);
492 }
493
494 }
495
496 if (transportHeaders.size() > 0) {
497 cxfContext.put(Message.PROTOCOL_HEADERS, transportHeaders);
498 }
499 }
500
501 protected static Object getContentFromCxf(Message message, DataFormat dataFormat) {
502 Set<Class<?>> contentFormats = message.getContentFormats();
503 Object answer = null;
504 if (contentFormats != null) {
505
506 if (LOG.isTraceEnabled()) {
507 for (Class<?> contentFormat : contentFormats) {
508 LOG.trace("Content format=" + contentFormat + " value="
509 + message.getContent(contentFormat));
510 }
511 }
512
513 if (dataFormat == DataFormat.POJO) {
514 answer = message.getContent(List.class);
515 if (answer == null) {
516 answer = message.getContent(Object.class);
517 if (answer != null) {
518 answer = new MessageContentsList(answer);
519 }
520 }
521 } else if (dataFormat == DataFormat.PAYLOAD) {
522 // TODO handle other message types in the future. Currently, this binding only
523 // deal with SOAP in PayLoad mode.
524 List<Element> body = message.get(List.class);
525 List<SoapHeader> headers = CastUtils.cast((List<?>)message.get(Header.HEADER_LIST));
526 answer = new CxfPayload<SoapHeader>(headers, body);
527 } else if (dataFormat == DataFormat.MESSAGE) {
528 answer = message.getContent(InputStream.class);
529 }
530
531 if (LOG.isTraceEnabled()) {
532 LOG.trace("Extracted body from CXF message = " + answer);
533 }
534 }
535 return answer;
536 }
537
538 public static Object getBodyFromCamel(org.apache.camel.Message out,
539 DataFormat dataFormat) {
540 Object answer = null;
541
542 if (dataFormat == DataFormat.POJO) {
543 answer = out.getBody();
544 } else if (dataFormat == DataFormat.PAYLOAD) {
545 answer = out.getBody(CxfPayload.class);
546 } else if (dataFormat == DataFormat.MESSAGE) {
547 answer = out.getBody(InputStream.class);
548 }
549 return answer;
550 }
551
552 public void copyJaxWsContext(org.apache.cxf.message.Exchange cxfExchange, Map<String, Object> context) {
553 if (cxfExchange.getOutMessage() != null) {
554 org.apache.cxf.message.Message outMessage = cxfExchange.getOutMessage();
555 for (Map.Entry<String, Object> entry : context.entrySet()) {
556 if (outMessage.get(entry.getKey()) == null) {
557 outMessage.put(entry.getKey(), entry.getValue());
558 }
559 }
560 }
561 }
562
563 public void extractJaxWsContext(org.apache.cxf.message.Exchange cxfExchange, Map<String, Object> context) {
564 org.apache.cxf.message.Message inMessage = cxfExchange.getInMessage();
565 for (Map.Entry<String, Object> entry : inMessage.entrySet()) {
566 if (entry.getKey().startsWith("javax.xml.ws")) {
567 context.put(entry.getKey(), entry.getValue());
568 }
569 }
570
571 }
572
573 }