PK
HAoa, mimetypeapplication/epub+zipPK
HA META-INF/PK
HA OEBPS/PK
HA
OEBPS/images/PK
HA OEBPS/imagesdb/PK
HAh h META-INF/container.xml
Producer endpoints normally follow a synchronous pattern when
processing an exchange. When the preceding processor in a pipeline calls
process() on a producer, the process()
method blocks until a reply is received. In this case, the processor's thread remains
blocked until the producer has completed the cycle of sending the request and receiving
the reply.
Sometimes, however, you might prefer to decouple the preceding processor from the
producer, so that the processor's thread is released immediately and the
process() call does not block. In this
case, you should implement the producer using an asynchronous
pattern, which gives the preceding processor the option of invoking a non-blocking version
of the process() method.
To give you an overview of the different implementation options, this section describes both the synchronous and the asynchronous patterns for implementing a producer endpoint.
Figure 5.6 shows an outline of a synchronous producer, where the preceding processor blocks until the producer has finished processing the exchange.
The synchronous producer processes an exchange as follows:
The preceding processor in the pipeline calls the synchronous
process() method on the producer to initiate synchronous
processing. The synchronous process() method takes a single
exchange argument.
In the body of the process() method, the producer sends
the request (In message) to the endpoint.
If required by the exchange pattern, the producer waits for the reply
(Out message) to arrive from the endpoint. This step can cause
the process() method to block indefinitely. However, if the
exchange pattern does not mandate a reply, the process()
method can return immediately after sending the request.
When the process() method returns, the exchange object
contains the reply from the synchronous call (an Out message
message).
Figure 5.7 shows an outline of an asynchronous producer, where the producer processes the exchange in a sub-thread, and the preceding processor is not blocked for any significant length of time.
The asynchronous producer processes an exchange as follows:
Before the processor can call the asynchronous process()
method, it must create an asynchronous callback object, which
is responsible for processing the exchange on the return portion of the route. For the
asynchronous callback, the processor must implement a class that inherits from the
AsyncCallback interface.
The processor calls the asynchronous process() method on
the producer to initiate asynchronous processing. The asynchronous
process() method takes two arguments:
an exchange object
a synchronous callback object
In the body of the process() method, the producer creates
a Runnable object that encapsulates the processing code. The producer
then delegates the execution of this Runnable object to a
sub-thread.
The asynchronous process() method returns, thereby
freeing up the processor's thread. The exchange processing continues in a separate
sub-thread.
The Runnable object sends the In message to the
endpoint.
If required by the exchange pattern, the Runnable object waits for
the reply (Out or Fault message) to arrive
from the endpoint. The Runnable object remains blocked until the reply is
received.
After the reply arrives, the Runnable object inserts the reply
(Out message) into the exchange object and then calls
done() on the asynchronous callback object. The
asynchronous callback is then responsible for processing the reply message (executed
in the sub-thread).
The pattern used to implement the consumer determines the threading model used in processing the incoming exchanges. Consumers can be implemented using one of the following patterns:
Event-driven pattern—The consumer is driven by an external thread.
Scheduled poll pattern—The consumer is driven by a dedicated thread pool.
Polling pattern—The threading model is left undefined.
In the event-driven pattern, the processing of an incoming request is initiated when
another part of the application (typically a third-party library) calls a method
implemented by the consumer. A good example of an event-driven consumer is the Fuse Mediation Router
JMX component, where events are initiated by the JMX library. The JMX library calls the
handleNotification() method to initiate request
processing—see Example 8.3 for
details.
Figure 5.3 shows an
outline of the event-driven consumer pattern. In this example, it is assumed that
processing is triggered by a call to the
method.notify()
The event-driven consumer processes incoming requests as follows:
The consumer must implement a method to receive the incoming event (in Figure 5.3 this is represented by the
method). The thread
that calls notify() is normally a
separate part of the application, so the consumer's threading policy is externally
driven.notify()
For example, in the case of the JMX consumer implementation, the consumer
implements the NotificationListener.handleNotification()
method to receive notifications from JMX. The threads that drive the consumer
processing are created within the JMX layer.
In the body of the
method, the consumer first converts the incoming event into an exchange object,
notify()E, and then calls process() on the next
processor in the route, passing the exchange object as its argument.
In the scheduled poll pattern, the consumer retrieves incoming requests by checking at regular time intervals whether or not a request has arrived. Checking for requests is scheduled automatically by a built-in timer class, the scheduled executor service, which is a standard pattern provided by the java.util.concurrent library. The scheduled executor service executes a particular task at timed intervals and it also manages a pool of threads, which are used to run the task instances.
Figure 5.4 shows an outline of the scheduled poll consumer pattern.
The scheduled poll consumer processes incoming requests as follows:
The scheduled executor service has a pool of threads at its disposal, that can be
used to initiate consumer processing. After each scheduled time interval has elapsed,
the scheduled executor service attempts to grab a free thread from its pool (there are
five threads in the pool by default). If a free thread is available, it uses that
thread to call the poll() method on the consumer.
The consumer's poll() method is intended to trigger
processing of an incoming request. In the body of the poll()
method, the consumer attempts to retrieve an incoming message. If no request is
available, the poll() method returns immediately.
If a request message is available, the consumer inserts it into an exchange object
and then calls process() on the next processor in the route,
passing the exchange object as its argument.
In the polling pattern, processing of an incoming request is initiated when a third-party calls one of the consumer's polling methods:
receive()
receiveNoWait()
receive(long timeout)
It is up to the component implementation to define the precise mechanism for initiating calls on the polling methods. This mechanism is not specified by the polling pattern.
Figure 5.5 shows an outline of the polling consumer pattern.
The polling consumer processes incoming requests as follows:
Processing of an incoming request is initiated whenever one of the consumer's polling methods is called. The mechanism for calling these polling methods is implementation defined.
In the body of the receive() method, the consumer
attempts to retrieve an incoming request message. If no message is currently
available, the behavior depends on which receive method was called.
receiveNoWait() returns immediately
receive(long timeout) waits for the specified timeout interval[2] before returning
receive() waits until a message is received
If a request message is available, the consumer inserts it into an exchange object
and then calls process() on the next processor in the route,
passing the exchange object as its argument.
A Fuse Mediation Router component consists of a set of classes that are related to each other
through a factory pattern. The primary entry point to a component is the
Component object itself (an instance of
org.apache.camel.Component type). You can use the Component
object as a factory to create Endpoint objects, which in turn act as
factories for creating Consumer, Producer, and
Exchange objects. These relationships are summarized in Figure 5.1
A component implementation is an endpoint factory. The main task of a component
implementor is to implement the Component.createEndpoint()
method, which is responsible for creating new endpoints on demand.
Each kind
of component must be associated with a component prefix that
appears in an endpoint URI. For example, the file component is usually associated with the
file prefix, which can be used in an endpoint URI like
file://tmp/messages/input. When you install a new component in Fuse Mediation Router, you
must define the association between a particular component prefix and the name of the
class that implements the component.
Each endpoint instance encapsulates a particular endpoint URI. Every time Fuse Mediation Router encounters a new endpoint URI, it creates a new endpoint instance. An endpoint object is also a factory for creating consumer endpoints and producer endpoints.
Endpoints must implement the
org.apache.camel.Endpoint interface. The
Endpoint interface defines the following factory
methods:
createConsumer() and
createPollingConsumer()—Creates a consumer endpoint,
which represents the source endpoint at the beginning of a route.
createProducer()—Creates a producer endpoint, which
represents the target endpoint at the end of a route.
createExchange()—Creates an exchange object, which
encapsulates the messages passed up and down the route.
Consumer endpoints consume requests. They always appear at the start of a route and they encapsulate the code responsible for receiving incoming requests and dispatching outgoing replies. From a service-oriented prospective a consumer represents a service.
Consumers must implement the
org.apache.camel.Consumer interface. There are a number
of different patterns you can follow when implementing a consumer. These patterns are
described in Consumer Patterns and Threading.
Producer endpoints produce requests. They always appears at the end of a route and they encapsulate the code responsible for dispatching outgoing requests and receiving incoming replies. From a service-oriented prospective a producer represents a service consumer.
Producers must implement the
org.apache.camel.Producer interface. You can optionally
implement the producer to support an asynchronous style of processing. See Asynchronous Processing for details.
Exchange objects encapsulate a related set of messages. For example, one kind of message exchange is a synchronous invocation, which consists of a request message and its related reply.
Exchanges must implement the
org.apache.camel.Exchange interface. The default
implementation, DefaultExchange, is sufficient for many component
implementations. However, if you want to associated extra data with the exchanges or have
the exchanges preform additional processing, it can be useful to customize the exchange
implementation.
There are two different message slots in an Exchange object:
In message—holds the current message.
Out message—temporarily holds a reply message.
All of the message types are represented by the same Java object,
org.apache.camel.Message. It is not always necessary to customize the
message implementation—the default implementation,
DefaultMessage, is usually adequate.
A Fuse Mediation Router route is essentially a pipeline of processors, of
org.apache.camel.Processor type. Messages are
encapsulated in an exchange object, E, which gets passed from node to node by
invoking the process() method. The architecture of the processor pipeline is
illustrated in Figure 5.2.
At the start of the route, you have the source endpoint, which is represented by an
org.apache.camel.Consumer object. The source endpoint is responsible for
accepting incoming request messages and dispatching replies. When constructing the route,
Fuse Mediation Router creates the appropriate Consumer type based on the component prefix
from the endpoint URI, as described in Factory Patterns for a Component.
Each intermediate node in the pipeline is represented by a processor object
(implementing the org.apache.camel.Processor interface).
You can insert either standard processors (for example, filter,
throttler, or delayer) or insert your own custom processor
implementations.
At the end of the route is the target endpoint, which is represented by an
org.apache.camel.Producer object. Because it comes at the end of a
processor pipeline, the producer is also a processor object (implementing the
org.apache.camel.Processor interface). The target
endpoint is responsible for sending outgoing request messages and receiving incoming
replies. When constructing the route, Fuse Mediation Router creates the appropriate
Producer type based on the component prefix from the endpoint URI.
Auto-discovery is a mechanism that enables you to dynamically add components to your
Fuse Mediation Router application. The component URI prefix is used as a key to load components on
demand. For example, if Fuse Mediation Router encounters the endpoint URI,
activemq://MyQName, and the ActiveMQ endpoint is not yet loaded, Fuse Mediation Router
searches for the component identified by the activemq prefix and dynamically
loads the component.
Before configuring auto-discovery, you must ensure that your custom component classes are accessible from your current classpath. Typically, you bundle the custom component classes into a JAR file, and add the JAR file to your classpath.
To enable auto-discovery of your component, create a Java properties file named after
the component prefix, component-prefix, and store that file in
the following
location:
/META-INF/services/org/apache/camel/component/component-prefixThe
component-prefix properties file must contain the following
property
setting:
class=component-class-nameWhere
component-class-name is the fully-qualified name of your
custom component class. You can also define additional system property settings in this
file.
For example, you can enable auto-discovery for the Fuse Mediation Router FTP component by creating the following Java properties file:
/META-INF/services/org/apache/camel/component/ftp
Which contains the following Java property setting:
class=org.apache.camel.component.file.remote.RemoteFileComponent
![]() | Note |
|---|---|
The Java properties file for the FTP component is already defined in the JAR file,
|
You can add a component by configuring it in the Fuse Mediation Router Spring configuration file,
META-INF/spring/camel-context.xml. To find the component, the component's
URI prefix is matched against the ID attribute of a bean element in the
Spring configuration. If the component prefix matches a bean element ID, Fuse Mediation Router
instantiates the referenced class and injects the properties specified in the Spring
configuration.
![]() | Note |
|---|---|
This mechanism has priority over auto-discovery. If the CamelContext finds a Spring bean with the requisite ID, it will not attempt to find the component using auto-discovery. |
If there are any properties that you want to inject into your component class, define them as bean properties. For example:
public classCustomComponentextends DefaultComponent<CustomExchange> { ...PropTypegetProperty() { ... } void setProperty(PropTypev) { ... } }
The
get method and the
Property()set method access the
value of Property()property.
To configure a component in Spring, edit the configuration file,
META-INF/spring/camel-context.xml, as shown in Example 5.1.
Example 5.1. Configuring a Component in Spring
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<package>RouteBuilderPackage</package>
</camelContext>
<bean id="component-prefix" class="component-class-name">
<property name="property" value="propertyValue"/>
</bean>
</beans>The bean element with ID
component-prefix configures the
component-class-name component. You can inject properties
into the component instance using property elements. For
example, the property element in the preceding example would
inject the value, propertyValue, into the
property property by calling
set on the
component.Property()
Example 5.2 shows an example of how to configure
the Fuse Mediation Router's JMS component by defining a bean element with ID equal to jms.
These settings are added to the Spring configuration file,
camel-context.xml.
Example 5.2. JMS Component Spring Configuration
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<package>org.apache.camel.example.spring</package>
</camelContext>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"
value="vm://localhost?broker.persistent=false&broker.useJmx=false"/>
</bean>
</property>
</bean>
</beans>The | |
The bean element with ID, | |
JMS is just a wrapper for a messaging service. You must specify the concrete
implementation of the messaging system by setting the | |
In this example, the concrete implementation of the JMS messaging service is
Apache ActiveMQ. The |
You implement a new component by extending the
org.apache.camel.impl.DefaultComponent class, which provides some
standard functionality and default implementations for some of the methods. In particular,
the DefaultComponent class provides support for URI parsing and for
creating a scheduled executor (which is used for the scheduled poll
pattern).
The createEndpoint(String uri) method defined in the base
Component interface takes a complete, unparsed endpoint URI
as its sole argument. The DefaultComponent class, on the other hand,
defines a three-argument version of the createEndpoint() method
with the following
signature:
protected abstract Endpoint createEndpoint(
String uri,
String remaining,
Map parameters
)
throws Exception;uri
is the original, unparsed URI; remaining is the part of the URI that remains
after stripping off the component prefix at the start and cutting off the query options at
the end; and parameters contains the parsed query options. It is this version
of the createEndpoint() method that you must override when
inheriting from DefaultComponent. This has the advantage that the
endpoint URI is already parsed for you.
The following sample endpoint URI for
the file component shows how URI parsing works in
practice:
file:///tmp/messages/foo?delete=true&moveNamePostfix=.old
For
this URI, the following arguments are passed to the three-argument version of
createEndpoint():
| Argument | Sample Value |
|---|---|
uri | file:///tmp/messages/foo?delete=true&moveNamePostfix=.old |
remaining | /tmp/messages/foo |
parameters |
Two entries are set in
|
By default, the parameters extracted from the URI query options are injected on the
endpoint's bean properties. The DefaultComponent class automatically
injects the parameters for you.
For example, if you want to define a custom
endpoint that supports two URI query options: delete and
moveNamePostfix. All you must do is define the corresponding bean methods
(getters and setters) in the endpoint
class:
public class FileEndpoint extends ScheduledPollEndpoint {
...
public boolean isDelete() {
return delete;
}
public void setDelete(boolean delete) {
this.delete = delete;
}
...
public String getMoveNamePostfix() {
return moveNamePostfix;
}
public void setMoveNamePostfix(String moveNamePostfix) {
this.moveNamePostfix = moveNamePostfix;
}
}It is also possible to inject URI query options into consumer parameters. For details, see Consumer parameter injection.
If there are no parameters defined on your Endpoint class, you can optimize
the process of endpoint creation by disabling endpoint parameter injection. To disable
parameter injection on endpoints, override the
useIntrospectionOnEndpoint() method and implement it to return
false, as follows:
protected boolean useIntrospectionOnEndpoint() {
return false;
}![]() | Note |
|---|---|
The |
The scheduled executor is used in the scheduled poll pattern, where it is responsible for driving the periodic polling of a consumer endpoint (a scheduled executor is effectively a thread pool implementation).
To instantiate a scheduled executor service, use the
ExecutorServiceStrategy object that is returned by the
CamelContext.getExecutorServiceStrategy() method. For details of the Fuse Mediation Router
threading model, see Threading Model in Implementing Enterprise Integration Patterns.
![]() | Note |
|---|---|
Prior to Fuse Mediation Router 2.3, the |
If you want to validate the URI before creating an endpoint instance, you can override
the validateURI() method from the
DefaultComponent class, which has the following signature:
protected void validateURI(String uri,
String path,
Map parameters)
throws ResolveEndpointFailedException;If the supplied URI does not have the required format, the implementation of
validateURI() should throw the
org.apache.camel.ResolveEndpointFailedException
exception.
Example 6.2 outlines how to implement the
DefaultComponent.createEndpoint() method, which is responsible
for creating endpoint instances on demand.
The | |
When extending | |
Create an instance of your custom endpoint type,
|
Example 6.3 shows a sample implementation of a
FileComponent class.
Example 6.3. FileComponent Implementation
package org.apache.camel.component.file;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
import java.io.File;
import java.util.Map;
public class FileComponent extends DefaultComponent {
public static final String HEADER_FILE_NAME = "org.apache.camel.file.name";
public FileComponent() {
}
public FileComponent(CamelContext context) {
super(context);
}
protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
File file = new File(remaining);
FileEndpoint result = new FileEndpoint(file, uri, this);
return result;
}
}Always define a no-argument constructor for the component class in order to facilitate automatic instantiation of the class. | |
A constructor that takes the parent | |
The implementation of the |
To implement a Fuse Mediation Router component, you must implement the
org.apache.camel.Component interface. An instance of
Component type provides the entry point into a custom component. That is, all
of the other objects in a component are ultimately accessible through the
Component instance. Figure 6.1 shows
the relevant Java interfaces and classes that make up the Component inheritance
hierarchy.
Example 6.1 shows the definition of the
org.apache.camel.Component interface.
The Component interface defines the following
methods:
getCamelContext() and
setCamelContext()—References the
CamelContext to which this Component
belongs. The setCamelContext() method is automatically called
when you add the component to a CamelContext.
createEndpoint()—The factory method that gets called
to create Endpoint instances for this component. The
uri parameter is the endpoint URI, which contains the details
required to create the endpoint.
You can implement a consumer in one of the following ways:
In an event-driven consumer, processing is driven explicitly by external events. The events are received through an event-listener interface, where the listener interface is specific to the particular event source.
Example 8.3 shows the implementation of the
JMXConsumer class, which is taken from the Fuse Mediation Router JMX component
implementation. The JMXConsumer class is an example of an event-driven
consumer, which is implemented by inheriting from the
org.apache.camel.impl.DefaultConsumer class. In the case of the
JMXConsumer example, events are represented by calls on the
NotificationListener.handleNotification() method, which is a
standard way of receiving JMX events. In order to receive these JMX events, it is necessary
to implement the NotificationListener interface and override
the handleNotification() method, as shown in Example 8.3.
Example 8.3. JMXConsumer Implementation
package org.apache.camel.component.jmx;
import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
public class JMXConsumer extends DefaultConsumer implements NotificationListener {
JMXEndpoint jmxEndpoint;
public JMXConsumer(JMXEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.jmxEndpoint = endpoint;
}
public void handleNotification(Notification notification, Object handback) {
try {
getProcessor().process(jmxEndpoint.createExchange(notification));
} catch (Throwable e) {
handleException(e);
}
}
}The | ||||
You must implement at least one constructor that takes a reference to the parent
endpoint, | ||||
The
| ||||
This line of code combines two steps. First, the JMX notification object is converted into an exchange object, which is the generic representation of an event in Fuse Mediation Router. Then the newly created exchange object is passed to the next processor in the route (invoked synchronously). | ||||
The |
In a scheduled poll consumer, polling events are automatically generated by a timer
class, java.util.concurrent.ScheduledExecutorService. To receive the generated
polling events, you must implement the ScheduledPollConsumer.poll() method (see
Consumer Patterns and Threading).
Example 8.4 shows how to implement a
consumer that follows the scheduled poll pattern, which is implemented by extending the
ScheduledPollConsumer class.
Example 8.4. ScheduledPollConsumer Implementation
import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; import org.apache.camel.impl.ScheduledPollConsumer; public classCustomConsumerextends ScheduledPollConsumer {private final
CustomEndpointendpoint; publicCustomConsumer(CustomEndpointendpoint, Processor processor) {super(endpoint, processor); this.endpoint = endpoint; } protected void poll() throws Exception {
Exchange exchange = /* Receive exchange object ... */; // Example of a synchronous processor. getProcessor().process(exchange);
} @Override protected void doStart() throws Exception {
// Pre-Start: // Place code here to execute just before start of processing. super.doStart(); // Post-Start: // Place code here to execute just after start of processing. } @Override protected void doStop() throws Exception {
// Pre-Stop: // Place code here to execute just before processing stops. super.doStop(); // Post-Stop: // Place code here to execute just after processing stops. } }
Implement a scheduled poll consumer class,
| |
You must implement at least one constructor that takes a reference to the parent
endpoint, | |
Override the | |
In this example, the event is processed synchronously. If you want to process events
asynchronously, you should use a reference to an asynchronous processor instead, by
calling | |
(Optional) If you want some lines of code to execute as the
consumer is starting up, override the | |
(Optional) If you want some lines of code to execute as the
consumer is stopping, override the |
Example 8.5 outlines how to implement a
consumer that follows the polling pattern, which is implemented by extending the
PollingConsumerSupport class.
Example 8.5. PollingConsumerSupport Implementation
import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.PollingConsumerSupport; public classCustomConsumerextends PollingConsumerSupport {private final
CustomEndpointendpoint; publicCustomConsumer(CustomEndpointendpoint) {super(endpoint); this.endpoint = endpoint; } public Exchange receiveNoWait() {
Exchange exchange = /* Obtain an exchange object. */; // Further processing ... return exchange; } public Exchange receive() {
// Blocking poll ... } public Exchange receive(long timeout) {
// Poll with timeout ... } protected void doStart() throws Exception {
// Code to execute whilst starting up. } protected void doStop() throws Exception { // Code to execute whilst shutting down. } }
Implement your polling consumer class, | |
You must implement at least one constructor that takes a reference to the parent
endpoint, | |
The | |
The | |
The | |
If you want to insert code that executes while a consumer is starting up or shutting
down, implement the |
If the standard consumer patterns are not suitable for your consumer implementation, you
can implement the Consumer interface directly and write the threading code
yourself. When writing the threading code, however, it is important that you comply with the
standard Fuse Mediation Router threading model, as described in Threading Model in Implementing Enterprise Integration Patterns.
For example, the SEDA component from camel-core implements its own consumer
threading, which is consistent with the Fuse Mediation Router threading model. Example 8.6 shows an outline of how the
SedaConsumer class implements its threading.
Example 8.6. Custom Threading Implementation
package org.apache.camel.component.seda;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A Consumer for the SEDA component.
*
* @version $Revision: 922485 $
*/
public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware {
private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);
private SedaEndpoint endpoint;
private Processor processor;
private ExecutorService executor;
...
public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
this.endpoint = endpoint;
this.processor = processor;
}
...
public void run() {
BlockingQueue<Exchange> queue = endpoint.getQueue();
// Poll the queue and process exchanges
...
}
...
protected void doStart() throws Exception {
int poolSize = endpoint.getConcurrentConsumers();
executor = endpoint.getCamelContext().getExecutorServiceStrategy()
.newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize);
for (int i = 0; i < poolSize; i++) {
executor.execute(this);
}
endpoint.onStarted(this);
}
protected void doStop() throws Exception {
endpoint.onStopped(this);
// must shutdown executor on stop to avoid overhead of having them running
endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor);
executor = null;
if (multicast != null) {
ServiceHelper.stopServices(multicast);
}
}
...
//----------
// Implementation of ShutdownAware interface
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
// deny stopping on shutdown as we want seda consumers to run in case some other queues
// depend on this consumer to run, so it can complete its exchanges
return true;
}
public int getPendingExchangesSize() {
// number of pending messages on the queue
return endpoint.getQueue().size();
}
}The | |
Implement the | |
The | |
Instead of creating threads directly, you should create a thread pool using the
For details, see Threading Model in Implementing Enterprise Integration Patterns. | |
Kick off the threads by calling the | |
The | |
Shut down the thread pool, which is represented by the |