|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||
java.lang.Objectorg.apache.servicemix.nmr.core.ServiceRegistryImpl<ClusterRegistration>
org.apache.servicemix.jbi.cluster.engine.ClusterEngine
public class ClusterEngine
Throttling
==========
As the processing of consumed JMS messages is performed asynchronously, in a case
where there are lots of requests pending in the queue, we need to limit the number
of exchanges sent into the NMR at a given time so that it does not end in out of
memory errors. In order to do so, the maxPendingExchanges property can
be configured. The cluster endpoint keeps track of the number of exchange that it has
send and which are not fully processed. If the maximum number is reached, it will stop
the consumption of new requests until that number comes back below the threshold.
The default value is 4096.
Use of JMS selectors
====================
In order for this endpoint to be used to cluster several services, and in order
to use a minimum amount of JMS destinations, the cluster endpoint uses selector to
listen to exchanges it is interested in.
Another requirement is that the response for a given JMS message is consumed only
by the cluster endpoint that originally sent the JMS request.
Therefore, we use the name property of this endpoint (which has to be
unique in the cluster) as a JMS property on the messages. The endpoint acting as a
JBI provider will create a JMS message and set this property. It will only consume
messages that have the needed value for the cluster name, hence only consuming his
own responses. On the JBI consumer side of the endpoint, the endpoint will consume
JMS messages of two kinds: new requests (containing an IN message and a JBI target
that is available on the container), and also replies targeted to this cluster endpoint.
Also, the maxPendingExchanges on the JBI consumer side has an effect
here. When the number of requests that have been sent into the NMR but not fully
processed yet reaches the above value, the JMS consumer will not consume messages
corresponding to new requests, but will only service replies. This behavior will
remain until some pending exchanges are processed.
Transactions
============
Four transactional modesl are define: None, ClientAck, Jms and Xa. The first one
will use JMS session in auto acknowledge mode, which means there will be no redelivery
at all. The ClientAck mode will use client acknowledgements on the JMS
sessions and will send the ack when the exchange comes back. The Jms mode
uses JMS local transactions, and the Xa uses XA transactions.
The behavior of the cluster endpoint when receiving back the exchange is also
controlled by the rollbackOnErrors flag. If this flag is set and the
transaction model is not None, any exchange that comes back in an
Error status will lead to the transaction being rolled back (or the ack
not sent). This also means that no error status can be conveyed back to the original
exchange.
TODO: this should be configured on a per endpoint basis
Using multiple clustered endpoints:
If/when this cluster endpoint is used to cluster multiple endpoints
(i.e. an interceptor is used to redirect exchanges to this endpoint),
and if the containers do not have the exact same set of endpoints deployed,
a jms consumer may consume JMS messages targeted to an endpoint which happen
to not be available in this container. In such a case, the jms consumer will
use a JMS selector to ensure it will only consume JMS messages it will be able
to process. This is of course a bit more CPU intensive on the broker.
When this cluster endpoint is used in an explicit wiring between two endpoints,
or if the container are supposed to have the exact same set of endpoints available,
the assumeSameContainers flag can be turned on, which will disable the
use of JMS selectors.
Transactions support
Transactions are supported at the JMS consumer level. In JMS, such transactions
will usually involve receiving a JMS message and sending a response back in the
same transaction. Transactions are supported if the JBI statuses are conveyed back
but in such cases, the transaction will never be rolled back. It thus means
it will provide a guarantee if the container is shut down, but will not offer a
redelivery mechanism in case something went wrong while processing the message.
TODO: handle XA transactions
TODO: transactions are not handled on the provider side
Conveying JBI status back
The boolean property conveyJbiStatus can be used to
control whether the JBI statuses (DONE and ERROR) are always sent back
across the JMS layer. It changes the number of JMS messages sent
for a given JBI exchange and also change the transactions semantics.
When JBI statuses are always sent, an InOnly request will be translated
into two JMS messages (one for the In message and another one for the
DONE or ERROR status or three for an InOnly MEP.
This means that the transaction (if any) will not be rolled back if the
exchange comes back with an ERROR status. On the opposite, if JBI statuses
are not sent back, a single JMS message will be
used: if the exchange comes back with an ERROR, the transaction on the JMS
consumer side will be rolled back and the message redelivered; nothing will
be reported back to the JMS producer side.
Note that this behavior (not conveying JBI statuses and rolling back the
transaction) is only available when using transactions as the message need
to be redelivered by the JMS broker.
Therefore the InOnly exchange DONE status will be sent immediately after the
JMS message has been sent. For a RobustInOnly, the JMS consumer will sent back
a fault or a DONE status, but an ERROR status will cause the transaction to be
rolled back and the message redelivered.
For an InOut MEP, either two or three messages will be used if JBI statuses
are conveyed or not.
TODO: what about InOptionalOut which allows a fault to be sent back from the
JBI consumer to the provider after receiving the out message ?
TODO: update doc with rollbackOnErrors
this flag is only used on the JMS consumer side and sent in the JMS message
for the other side to know how to handle the message
TODO: the rollbackOnErrors flag should be configured on a per-endpoint basis
TODO: add a cache level
not caching the connection would only work when using non temporary
queues for the reply destination
TODO: handle JMS exceptions => refresh connection
note that refreshing the connection when using temporary queues
would lead to loosing messages in the temp queue
TODO: simplify the selectors when a single endpoint is clustered
| Field Summary | |
|---|---|
protected Channel |
channel
|
static int |
DEFAULT_MAX_PENDING_EXCHANGES
Default maximum number of pending exchanges |
protected Map<String,Exchange> |
exchanges
|
protected static String |
JBI_ENDPOINT
JMS property holding the endpoint for the exchange |
protected static String |
JBI_INTERFACE
JMS property holding the interface QName for the exchange |
protected static String |
JBI_MEP
JMS property holding the Message Exchange Pattern |
protected static String |
JBI_MESSAGE
Name of the JMS property holding the type of message sent |
protected static int |
JBI_MESSAGE_DONE
The JMS message contains a JBI DONE status |
protected static int |
JBI_MESSAGE_ERROR
The JMS message contains a JBI ERROR status |
protected static int |
JBI_MESSAGE_FAULT
The JMS message contains a JBI Fault message |
protected static int |
JBI_MESSAGE_IN
The JMS message contains a JBI In message |
protected static int |
JBI_MESSAGE_OUT
The JMS message contains a JBI Out message |
protected static String |
JBI_OPERATION
JMS property holding the operation QName for the exchange |
protected static String |
JBI_SERVICE
JMS property holding the service QName for the exchange |
protected org.slf4j.Logger |
logger
|
protected int |
maxPendingExchanges
|
protected String |
name
|
protected AtomicBoolean |
pauseConsumption
|
protected AtomicInteger |
pendingExchanges
|
protected JmsRequestorPool |
pool
|
protected static String |
PROPERTY_CLUSTER_NAME
JMS property holding the name of the cluster (used with selectors) |
protected static String |
PROPERTY_CORR_ID
JMS property holding the correlation id |
protected static String |
PROPERTY_ROLLBACK_ON_ERRORS
JMS property containing the rollbackOnErrors flag for this message. |
protected static String |
PROPERTY_SENDER_CLUSTER_NAME
JMS property holding the name of the cluster to use for the response |
protected static String |
PROPERTY_SENDER_CORR_ID
JMS property holding the correlation id to be used for the reply. |
protected boolean |
rollbackOnErrors
|
protected String |
selector
|
protected AtomicBoolean |
started
|
| Fields inherited from interface org.apache.servicemix.nmr.api.Endpoint |
|---|
CHANNEL_SYNC_DELIVERY, ENDPOINT_NAME, INTERFACE_NAME, NAME, RUN_AS_SUBJECT, SERVICE_NAME, UNTARGETABLE, VERSION, WSDL_URL |
| Constructor Summary | |
|---|---|
ClusterEngine()
|
|
| Method Summary | |
|---|---|
void |
afterPropertiesSet()
|
protected void |
decrementPendingExchangeIfNeeded(Exchange exchange)
|
void |
destroy()
|
protected void |
done(Exchange exchange)
|
void |
endpointRegistered(InternalEndpoint endpoint)
An endpoint has been registered |
void |
endpointUnregistered(InternalEndpoint endpoint)
An endpoint has been unregistered |
void |
exchangeDelivered(Exchange exchange)
Method called each time an exchange is delivered |
void |
exchangeFailed(Exchange exchange)
Method called when an exchange resulted in an exception to be thrown and the exchange not delivered. |
void |
exchangeSent(Exchange exchange)
Method called each time an exchange is sent |
protected void |
fail(Exchange exchange,
Exception e)
|
protected List<javax.jbi.servicedesc.ServiceEndpoint> |
getAllEndpoints()
|
Channel |
getChannel()
|
protected javax.jbi.servicedesc.ServiceEndpoint |
getEndpoint(QName serviceName,
String endpointName)
|
int |
getMaxPendingExchanges()
|
String |
getName()
|
JmsRequestorPool |
getPool()
|
protected String |
getSelector()
|
void |
invalidateSelector()
|
boolean |
isRollbackOnErrors()
|
void |
pause()
|
void |
process(Exchange exchange)
Process the given exchange. |
protected void |
process(JmsRequestor requestor)
Process a JMS message |
protected void |
processExchange(JmsRequestor requestor,
Exchange exchange)
Process a JBI exchange |
void |
resume()
|
protected void |
send(Exchange exchange)
|
void |
setChannel(Channel channel)
Set the channel so that the endpoint can send exchanges back when they are processed or act as a consumer itself. |
void |
setMaxPendingExchanges(int maxPendingExchanges)
Specifies the maximum number of pending exchanges on the JMS consumer side. |
void |
setName(String name)
A unique name for this cluster endpoint. |
void |
setPool(JmsRequestorPool pool)
|
void |
setRollbackOnErrors(boolean rollbackOnErrors)
|
void |
start()
|
protected Message |
unmarshallMessage(javax.jms.Message message)
|
| Methods inherited from class org.apache.servicemix.nmr.core.ServiceRegistryImpl |
|---|
doRegister, doUnregister, getProperties, getServices, register, unregister |
| Methods inherited from class java.lang.Object |
|---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
| Field Detail |
|---|
public static final int DEFAULT_MAX_PENDING_EXCHANGES
protected static final String JBI_MESSAGE
protected static final int JBI_MESSAGE_IN
protected static final int JBI_MESSAGE_OUT
protected static final int JBI_MESSAGE_FAULT
protected static final int JBI_MESSAGE_DONE
protected static final int JBI_MESSAGE_ERROR
protected static final String JBI_MEP
protected static final String JBI_INTERFACE
protected static final String JBI_OPERATION
protected static final String JBI_SERVICE
protected static final String JBI_ENDPOINT
protected static final String PROPERTY_CORR_ID
protected static final String PROPERTY_SENDER_CORR_ID
protected static final String PROPERTY_ROLLBACK_ON_ERRORS
protected static final String PROPERTY_CLUSTER_NAME
protected static final String PROPERTY_SENDER_CLUSTER_NAME
protected final org.slf4j.Logger logger
protected boolean rollbackOnErrors
protected String name
protected JmsRequestorPool pool
protected Channel channel
protected AtomicBoolean started
protected final Map<String,Exchange> exchanges
protected String selector
protected AtomicInteger pendingExchanges
protected AtomicBoolean pauseConsumption
protected int maxPendingExchanges
| Constructor Detail |
|---|
public ClusterEngine()
| Method Detail |
|---|
public Channel getChannel()
public void setChannel(Channel channel)
Endpoint
setChannel in interface Endpointchannel - the channel that this endpoint can useEndpointRegistry.register(Endpoint, java.util.Map)public JmsRequestorPool getPool()
public void setPool(JmsRequestorPool pool)
public boolean isRollbackOnErrors()
public void setRollbackOnErrors(boolean rollbackOnErrors)
public String getName()
public void setName(String name)
name - public int getMaxPendingExchanges()
public void setMaxPendingExchanges(int maxPendingExchanges)
maxPendingExchanges -
public void afterPropertiesSet()
throws Exception
Exception
public void start()
throws Exception
Exception
public void destroy()
throws Exception
Exceptionpublic void pause()
public void resume()
public void endpointRegistered(InternalEndpoint endpoint)
EndpointListener
endpointRegistered in interface EndpointListenerendpoint - the registered endpointpublic void endpointUnregistered(InternalEndpoint endpoint)
EndpointListener
endpointUnregistered in interface EndpointListenerendpoint - the unregistered endpointpublic void exchangeSent(Exchange exchange)
ExchangeListener
exchangeSent in interface ExchangeListenerexchange - the exchange sentpublic void exchangeDelivered(Exchange exchange)
ExchangeListener
exchangeDelivered in interface ExchangeListenerexchange - the delivered exchangepublic void exchangeFailed(Exchange exchange)
ExchangeListener
exchangeFailed in interface ExchangeListenerexchange - the exchange that failedpublic void invalidateSelector()
protected String getSelector()
protected javax.jbi.servicedesc.ServiceEndpoint getEndpoint(QName serviceName,
String endpointName)
protected List<javax.jbi.servicedesc.ServiceEndpoint> getAllEndpoints()
protected void done(Exchange exchange)
protected void fail(Exchange exchange,
Exception e)
protected void send(Exchange exchange)
public void process(Exchange exchange)
Endpoint
process in interface Endpointexchange - the exchange to process
protected void process(JmsRequestor requestor)
throws javax.jms.JMSException
requestor - the item to use
javax.jms.JMSException - if an error occur
protected void processExchange(JmsRequestor requestor,
Exchange exchange)
throws Exception
requestor - the item to useexchange - the exchange to process
Exception - if an error occurprotected void decrementPendingExchangeIfNeeded(Exchange exchange)
protected Message unmarshallMessage(javax.jms.Message message)
throws javax.jms.JMSException
javax.jms.JMSException
|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||