Forum Home » Fuse Distributions » Fuse Mediation Router

Thread: Streaming mode with custom splitter

 
This question is answered. Helpful answers available: 2. Correct answers available: 1.


Permlink Replies: 11 - Last Post: Dec 16, 2011 7:19 AM Last Post By: marks1900
dekk11

Posts: 17
Registered: 08/11/10
Streaming mode with custom splitter
Posted: Aug 16, 2010 7:01 PM
 
  Click to reply to this thread Reply
I have written a customer split method to process an incoming message body then split the payload into fixed sized byte arrays. In attempt to process large incoming files, I implemented the streaming option of the Splitter EIP which did not work with my custom splitter.

DSL Ex:
from("incomingUri").split().method("mySplitterProcessor","splitBodyInBytes").streaming()

Spring Ex:
<camelContext ...>
<route ...>
<split streaming="true">
<method bean="myFileSplitter" method="splitBodyinBytes"/>
<to uri="aggregatorEndPoint"/>
</split>
</route>

Are you aware of any examples of using streaming mode with a custom split method within the Splitter EIP? Any additional information or direction would be very useful and appreciative.

Thanks
Steve

Edited by: dekk11 on Aug 16, 2010 7:02 PM

Edited by: dekk11 on Aug 16, 2010 7:02 PM
davsclaus

Posts: 1,893
Registered: 10/14/08
Re: Streaming mode with custom splitter
Posted: Aug 17, 2010 10:40 AM   in response to: dekk11 in response to: dekk11
 
  Click to reply to this thread Reply
What do you return from your custom split method?

It should be an Iterator, to work well with the streaming mode.
dekk11

Posts: 17
Registered: 08/11/10
Re: Streaming mode with custom splitter
Posted: Aug 18, 2010 5:50 PM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
Yes I am returning a List<Message>. I was able to get this to work in that I was relying on the Splitter EIP to set property "CamelSplitSize", but this is not set in streaming mode. I recalculated the CamelSplitSize in my split method and everything seemed to work. The "streaming mode" appears to be transparent.

According to "Camel In Action" (very nice book), it states that using streaming mode tells Camel to not load the entire payload into memory, but instead iterate the payload in a streaming fashion. In the case of a GenericFileMessage, of which the body is being split, does Camel read in the entire file contents into memory or only a portion at a time? How does it determine how much to read from the stream?

Example:
<route>
<from uri="file:incoming"/>
<split streaming="true">
<method bean="myFileSplitter" method="splitBodyinBytes"/>
<to uri="aggregatorEndPoint"/>
</split>
</route>

In this example, when is the file payload actually read, on the <from> or delegated to <split>? I need to confirm that large incoming files are not entirely read into memory, but streamed to be read in a part at a time.
davsclaus

Posts: 1,893
Registered: 10/14/08
Re: Streaming mode with custom splitter
Posted: Aug 19, 2010 3:36 AM   in response to: dekk11 in response to: dekk11
 
  Click to reply to this thread Reply
Well if you return a List<Message> then you have already loaded the entire file into memory.

The difference between streaming vs. non streaming in the splitter, is just the fact that Camel will calculate the total size or not.

So for true streaming you need to return an Iterator<Message> which then returns the parts on-the-fly.
dekk11

Posts: 17
Registered: 08/11/10
Re: Streaming mode with custom splitter
Posted: Aug 19, 2010 9:05 PM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
In Section 8.1.3-"Splitting big messages", it briefly discusses using streaming with the splitter in context of the tokenizer, which only processes strings. It states that tokenizer uses java.util.Scanner to read chunks of data into memory. So the Scanner reads a "stream" of data, based on a token. The Scanner object is then passed back to Camel as an Expression, which Camel then uses to iterate over and deliver as parts. Is this correct?

I am in need of a similar implementation but for binary streams. Instead of getting the message body, i.e. splitBody(@Body byte[] body), I need to get the Exchange, i.e. splitBody(Exchange exchange). From the Exchange I could get the file and perform custom streaming to then create my Iterator. Does this sound feasible. How do I get the Exchange passed to the custom split method?

Do you know of any implementations such as this? Or would you recommend another approach of reading large binary files? Does the Stream component (ex. <from uri=stream:file:incoming>) provide any value here to be able to stream the file that I then can read as streaming input?

thx again!
davsclaus

Posts: 1,893
Registered: 10/14/08
Re: Streaming mode with custom splitter
Posted: Aug 20, 2010 1:56 PM   in response to: dekk11 in response to: dekk11
 
  Click to reply to this thread Reply
If you use a custom Iterator to read the file piece by piece then that works well with the streaming mode of the Camel Splitter EIP.

This is similar to as how the Scanner works. If you just want to split your file by a special token etc then the Scanner is a great choice. And Camel DSL have syntax sugar for this using the tokenizer.

If you need something more advanced/custom, then just use a custom Iterator. For example you can read in X number of bytes at each iteration step.
dekk11

Posts: 17
Registered: 08/11/10
Re: Streaming mode with custom splitter
Posted: Aug 25, 2010 5:14 PM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
Can you provide a simple example of how a custom split method would use an Iterator to get the file piece by piece? What would the signature of split method be?
davsclaus

Posts: 1,893
Registered: 10/14/08
Re: Streaming mode with custom splitter
Posted: Aug 25, 2010 5:48 PM   in response to: dekk11 in response to: dekk11
 
  Click to reply to this thread Reply
Just return Iterator as the return type of the method.

Then implement the Iterator to read a chunk of the file piece by piece and return it in next() and the hasNext() method.

I am sure you can google and find some examples.
dekk11

Posts: 17
Registered: 08/11/10
Re: Streaming mode with custom splitter
Posted: Aug 25, 2010 6:28 PM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
Thanks for your quick responses. I still am unclear on how the split method can get the file object from the exchange, so that it can then be converted into Interator. How do I get the File reference from within the split method?
davsclaus

Posts: 1,893
Registered: 10/14/08
Re: Streaming mode with custom splitter
Posted: Aug 25, 2010 8:44 PM   in response to: dekk11 in response to: dekk11
 
  Click to reply to this thread Reply
Read chapter 4 in the Camel book which talks about using beans with Camel.

public Iterator foo(File file) {
   return new Iterator() {
       // implement me to return chunks of the file
   };
}
dekk11

Posts: 17
Registered: 08/11/10
Re: Streaming mode with custom splitter
Posted: Aug 26, 2010 6:34 PM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
Another informative chapter in the book! I did implement an Iterator that accesses a file via a fileinputstream and implements reading the file a chunk at a time. I used the bean parameter binding access the exchange and it's members. Thanks again for your assistance.

Steve
marks1900

Posts: 12
Registered: 09/01/11
Re: Streaming mode with custom splitter
Posted: Dec 15, 2011 8:39 PM   in response to: dekk11 in response to: dekk11
 
  Click to reply to this thread Reply
Would you be able to provide the sample code on this example?

On the following page: http://camel.apache.org/splitter.html , there is an excerpt that I have questions about.

++ Excerpt - Start ++

import static org.apache.camel.builder.ExpressionBuilder.beanExpression;
from("direct:streaming")
.split(beanExpression(new MyCustomIteratorFactory(), "iterator"))
.streaming().to("activemq:my.parts")

++ Excerpt - End ++

Can someone provided me with same code for the MyCustomIteratorFactory class?

Basically I have one huge file, that I want to split into many jms messages.

The following is an outline of my current thoughts...

public void configure() throws Exception
{
from( "file://file-item-list/general?initialDelay=1000&delay=5000&delete=true" )
.transacted( "PROPAGATION_REQUIRED" )
.split(ExpressionBuilder.beanExpression(new MyCustomFileChunkIterator(), "iterator")) // split big file of items into smaller chunks
.streaming()
.to("activemq:queue:file-item-sublist-request");
}

Edited by: marks1900 on Dec 16, 2011 7:18 AM