As a consequence of its position at the start of a route, the consumer plays an especially important role. Many important features of the route are determined by the consumer. For example, the consumer gets to determine the threading model for processing the exchanges that pass through the route. The consumer is also responsible for determining the format of incoming request messages.
In order to accommodate different kinds of threading models for processing incoming requests, Mediation Router supports a variety of different consumer implementation patterns: the event-driven pattern allows the consumer to be driven by an external thread; the scheduled poll pattern creates a dedicated thread pool to drive the consumer; and the polling pattern leaves the threading model undefined.
You can implement a consumer based on one of the following patterns:
In the event-driven pattern, processing of an incoming request is initiated when
another part of the application (typically a third-party library) calls a method
implemented by the consumer. A good example of an event-driven consumer is the
Mediation Router JMX component, where events are initiated by the JMX library, which calls
the handleNotification() method to initiate request processing—see
Example 7.1 for details.
Figure 4.3 shows an outline of the
event-driven consumer pattern. In this example, it is assumed that processing is triggered
by a call to the notify() method.
The event-driven consumer processes incoming requests as follows:
The consumer must implement a method to receive the incoming event (in the figure,
this is represented by the notify() method).
The thread that calls notify() is normally a
separate part of the application. Hence, the consumer's threading policy is externally
driven.
For example, in the case of the JMX consumer implementation, the consumer
implements the NotificationListener.handleNotification() method in order
to receive notifications from JMX. The threads that drive the consumer processing are
created within the JMX layer.
In the body of the notify() method, the
consumer first converts the incoming event into an exchange object, E,
and then calls process() on the next processor in the route, passing the
exchange object as its argument.
In the scheduled poll pattern, the consumer retrieves incoming requests by checking at
regular time intervals whether or not a request has arrived. Checking for requests is
scheduled automatically by a built-in timer class, the scheduled executor
service, which is a standard pattern provided by the
java.util.concurrent library. The scheduled executor service is capable of
executing a particular task at timed intervals and it also manages a pool of threads,
which it uses to run the task instances.
Figure 4.4 shows an outline of the scheduled poll consumer pattern.
The scheduled poll consumer processes incoming requests as follows:
The scheduled executor service has a pool of threads at its disposal, which it can
use to initiate consumer processing. After each scheduled time interval has elapsed,
the scheduled executor service tries to get hold of a free thread from its pool (there
are five threads in the pool by default). If a free thread is available, it uses the
thread to call the poll() method on the consumer.
The consumer's poll() method is intended to trigger processing of an
incoming request. In the body of the poll() method, the consumer should
attempt to retrieve an incoming message. If no request is available, the
poll() method should return right away.
If a request message is available, the consumer inserts it into an exchange object
and then calls process() on the next processor in the route, passing the
exchange object as its argument.
In the polling pattern, processing of an incoming request is initiated when a
third-party calls one of the consumer's polling methods, receive(),
receiveNoWait(), and receive(long timeout). In general, it is
up to the component implementation to define the precise mechanism for initiating calls on
the polling methods. This mechanism is not specified by the polling pattern.
Figure 4.5 shows an outline of the polling consumer pattern.
The polling consumer processes incoming requests as follows:
Processing of an incoming request is initiated whenever one of the consumer's
polling methods (receive(), receiveNoWait(), or
receive(long timeout)) are called. The mechanism for calling these
polling methods is implementation defined.
In the body of the receive() method, the consumer attempts to
retrieve an incoming request message. If no message is currently available, the
behavior depends on which receive method was called: if the method is
receiveNoWait(), return immediately; if the method is
receive(long timeout), wait for the specified timeout (usually
specified in milliseconds) before returning; and if the method is
receive(), wait until a message is received (possibly
indefinitely).
If a request message is available, the consumer inserts it into an exchange object
and then calls process() on the next processor in the route, passing the
exchange object as its argument.