Forum Home » Fuse Distributions » Fuse Services Framework

Thread: Threaded streaming of MTOM attachments

 
This question is not answered. Helpful answers available: 2. Correct answers available: 1.


Permlink Replies: 0 Threads: [ Previous | Next ]
shanaghe

Posts: 14
Registered: 09/21/09
Threaded streaming of MTOM attachments
Posted: Oct 21, 2009 3:33 PM
 
  Click to reply to this thread Reply
Hi,

I have been prototyping a few scenarios involving transfer of large attachments with MTOM using CXF (v2.2.4). It's working very well with the exception of one case. We have a web service which is required to read in a large amount (~50MB) of data, interpret it, remarshal it and return it. Ideally, this should be done on the fly, i.e. the data would be remarshalled and streamed out as the input stream is being read in. For this prototype, I developed a service which took a file sent over MTOM and gzipped the input using a GZipOutputStream. I end up with a generated service with this method:

public javax.activation.DataHandler gzipData(
javax.activation.DataHandler data);

The code for the datahandler on the server is below but essentially, a thread is started to perform the pipelining of the streaming so control can be returned to the invoking thread and the WS invocation can return.

If I try this with a CXF client, I notice that only one thread is ever used by CXF. This means that reading of the response attachment only starts after the request attachment has been completely upload. This naturally results in a deadlock for my service.

This link gave me the impression that CXF would use threads, but looking through the CXF code and following the stack trace in my client (pasted at the end of this message), I can't see how it would work.
Is there any way to get CXF to do the client streaming in a thread, or am I stuck with writing the response to a temporary file before sending it back?

Thanks for the help,
Eoin

public class GZIPDataSource implements DataSource {
private PipedInputStream responsePipedIS = null;
private PipedOutputStream responsePipedOS = null;

public GZIPDataSource(final InputStream inputStream) {
responsePipedIS = new PipedInputStream();
try {
responsePipedOS = new PipedOutputStream(responsePipedIS);
} catch (IOException e1) {
throw new RuntimeException(e1);
}
Thread thread = new Thread("DataHandler Pipe Pump") {
public void run() {
try {
int total = 0;
GZIPOutputStream gzos = new GZIPOutputStream(responsePipedOS);
try {
while (true) {
byte[] buffer = new byte4096;
int read = inputStream.read(buffer);
if (read <= 0) {
break;
}
gzos.write(buffer, 0, read);
gzos.flush();
total += read;
LOG.info("Wrote " + total + " bytes");
}
} finally {
LOG.info("Finished writing");
gzos.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
};
thread.start();
}

public String getContentType() {
return "application/octet-stream";
}

public InputStream getInputStream() throws IOException {
return responsePipedIS;
}

public String getName() {
return null;
}

public OutputStream getOutputStream() throws IOException {
return null;
}
}

org.apache.cxf.attachment.AttachmentSerializer.writeAttachments(AttachmentSerializer.java:205) org.apache.cxf.interceptor.AttachmentOutInterceptor$AttachmentOutEndingInterceptor.handleMessage(AttachmentOutInterceptor.java:92)
org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:236)
org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:478)
org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:308)
org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:260)
org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:73)
org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:124)
$Proxy41.gzipData(Unknown Source)
WSClient.main(WSClient.java:29)