org.apache.servicemix.jbi.cluster.engine
Class ClusterEngine

java.lang.Object
  extended by org.apache.servicemix.nmr.core.ServiceRegistryImpl<ClusterRegistration>
      extended by org.apache.servicemix.jbi.cluster.engine.ClusterEngine
All Implemented Interfaces:
EventListener, Endpoint, EndpointListener, ExchangeListener, Listener, ServiceRegistry<ClusterRegistration>

public class ClusterEngine
extends ServiceRegistryImpl<ClusterRegistration>
implements Endpoint, EndpointListener, ExchangeListener

Throttling ========== As the processing of consumed JMS messages is performed asynchronously, in a case where there are lots of requests pending in the queue, we need to limit the number of exchanges sent into the NMR at a given time so that it does not end in out of memory errors. In order to do so, the maxPendingExchanges property can be configured. The cluster endpoint keeps track of the number of exchange that it has send and which are not fully processed. If the maximum number is reached, it will stop the consumption of new requests until that number comes back below the threshold. The default value is 4096. Use of JMS selectors ==================== In order for this endpoint to be used to cluster several services, and in order to use a minimum amount of JMS destinations, the cluster endpoint uses selector to listen to exchanges it is interested in. Another requirement is that the response for a given JMS message is consumed only by the cluster endpoint that originally sent the JMS request. Therefore, we use the name property of this endpoint (which has to be unique in the cluster) as a JMS property on the messages. The endpoint acting as a JBI provider will create a JMS message and set this property. It will only consume messages that have the needed value for the cluster name, hence only consuming his own responses. On the JBI consumer side of the endpoint, the endpoint will consume JMS messages of two kinds: new requests (containing an IN message and a JBI target that is available on the container), and also replies targeted to this cluster endpoint. Also, the maxPendingExchanges on the JBI consumer side has an effect here. When the number of requests that have been sent into the NMR but not fully processed yet reaches the above value, the JMS consumer will not consume messages corresponding to new requests, but will only service replies. This behavior will remain until some pending exchanges are processed. Transactions ============ Four transactional modesl are define: None, ClientAck, Jms and Xa. The first one will use JMS session in auto acknowledge mode, which means there will be no redelivery at all. The ClientAck mode will use client acknowledgements on the JMS sessions and will send the ack when the exchange comes back. The Jms mode uses JMS local transactions, and the Xa uses XA transactions. The behavior of the cluster endpoint when receiving back the exchange is also controlled by the rollbackOnErrors flag. If this flag is set and the transaction model is not None, any exchange that comes back in an Error status will lead to the transaction being rolled back (or the ack not sent). This also means that no error status can be conveyed back to the original exchange. TODO: this should be configured on a per endpoint basis Using multiple clustered endpoints: If/when this cluster endpoint is used to cluster multiple endpoints (i.e. an interceptor is used to redirect exchanges to this endpoint), and if the containers do not have the exact same set of endpoints deployed, a jms consumer may consume JMS messages targeted to an endpoint which happen to not be available in this container. In such a case, the jms consumer will use a JMS selector to ensure it will only consume JMS messages it will be able to process. This is of course a bit more CPU intensive on the broker. When this cluster endpoint is used in an explicit wiring between two endpoints, or if the container are supposed to have the exact same set of endpoints available, the assumeSameContainers flag can be turned on, which will disable the use of JMS selectors. Transactions support Transactions are supported at the JMS consumer level. In JMS, such transactions will usually involve receiving a JMS message and sending a response back in the same transaction. Transactions are supported if the JBI statuses are conveyed back but in such cases, the transaction will never be rolled back. It thus means it will provide a guarantee if the container is shut down, but will not offer a redelivery mechanism in case something went wrong while processing the message. TODO: handle XA transactions TODO: transactions are not handled on the provider side Conveying JBI status back The boolean property conveyJbiStatus can be used to control whether the JBI statuses (DONE and ERROR) are always sent back across the JMS layer. It changes the number of JMS messages sent for a given JBI exchange and also change the transactions semantics. When JBI statuses are always sent, an InOnly request will be translated into two JMS messages (one for the In message and another one for the DONE or ERROR status or three for an InOnly MEP. This means that the transaction (if any) will not be rolled back if the exchange comes back with an ERROR status. On the opposite, if JBI statuses are not sent back, a single JMS message will be used: if the exchange comes back with an ERROR, the transaction on the JMS consumer side will be rolled back and the message redelivered; nothing will be reported back to the JMS producer side. Note that this behavior (not conveying JBI statuses and rolling back the transaction) is only available when using transactions as the message need to be redelivered by the JMS broker. Therefore the InOnly exchange DONE status will be sent immediately after the JMS message has been sent. For a RobustInOnly, the JMS consumer will sent back a fault or a DONE status, but an ERROR status will cause the transaction to be rolled back and the message redelivered. For an InOut MEP, either two or three messages will be used if JBI statuses are conveyed or not. TODO: what about InOptionalOut which allows a fault to be sent back from the JBI consumer to the provider after receiving the out message ? TODO: update doc with rollbackOnErrors this flag is only used on the JMS consumer side and sent in the JMS message for the other side to know how to handle the message TODO: the rollbackOnErrors flag should be configured on a per-endpoint basis TODO: add a cache level not caching the connection would only work when using non temporary queues for the reply destination TODO: handle JMS exceptions => refresh connection note that refreshing the connection when using temporary queues would lead to loosing messages in the temp queue TODO: simplify the selectors when a single endpoint is clustered


Field Summary
protected  Channel channel
           
static int DEFAULT_MAX_PENDING_EXCHANGES
          Default maximum number of pending exchanges
protected  Map<String,Exchange> exchanges
           
protected static String JBI_ENDPOINT
          JMS property holding the endpoint for the exchange
protected static String JBI_INTERFACE
          JMS property holding the interface QName for the exchange
protected static String JBI_MEP
          JMS property holding the Message Exchange Pattern
protected static String JBI_MESSAGE
          Name of the JMS property holding the type of message sent
protected static int JBI_MESSAGE_DONE
          The JMS message contains a JBI DONE status
protected static int JBI_MESSAGE_ERROR
          The JMS message contains a JBI ERROR status
protected static int JBI_MESSAGE_FAULT
          The JMS message contains a JBI Fault message
protected static int JBI_MESSAGE_IN
          The JMS message contains a JBI In message
protected static int JBI_MESSAGE_OUT
          The JMS message contains a JBI Out message
protected static String JBI_OPERATION
          JMS property holding the operation QName for the exchange
protected static String JBI_SERVICE
          JMS property holding the service QName for the exchange
protected  org.slf4j.Logger logger
           
protected  int maxPendingExchanges
           
protected  String name
           
protected  AtomicBoolean pauseConsumption
           
protected  AtomicInteger pendingExchanges
           
protected  JmsRequestorPool pool
           
protected static String PROPERTY_CLUSTER_NAME
          JMS property holding the name of the cluster (used with selectors)
protected static String PROPERTY_CORR_ID
          JMS property holding the correlation id
protected static String PROPERTY_ROLLBACK_ON_ERRORS
          JMS property containing the rollbackOnErrors flag for this message.
protected static String PROPERTY_SENDER_CLUSTER_NAME
          JMS property holding the name of the cluster to use for the response
protected static String PROPERTY_SENDER_CORR_ID
          JMS property holding the correlation id to be used for the reply.
protected  boolean rollbackOnErrors
           
protected  String selector
           
protected  AtomicBoolean started
           
 
Fields inherited from interface org.apache.servicemix.nmr.api.Endpoint
CHANNEL_SYNC_DELIVERY, ENDPOINT_NAME, INTERFACE_NAME, NAME, RUN_AS_SUBJECT, SERVICE_NAME, UNTARGETABLE, VERSION, WSDL_URL
 
Constructor Summary
ClusterEngine()
           
 
Method Summary
 void afterPropertiesSet()
           
protected  void decrementPendingExchangeIfNeeded(Exchange exchange)
           
 void destroy()
           
protected  void done(Exchange exchange)
           
 void endpointRegistered(InternalEndpoint endpoint)
          An endpoint has been registered
 void endpointUnregistered(InternalEndpoint endpoint)
          An endpoint has been unregistered
 void exchangeDelivered(Exchange exchange)
          Method called each time an exchange is delivered
 void exchangeFailed(Exchange exchange)
          Method called when an exchange resulted in an exception to be thrown and the exchange not delivered.
 void exchangeSent(Exchange exchange)
          Method called each time an exchange is sent
protected  void fail(Exchange exchange, Exception e)
           
protected  List<javax.jbi.servicedesc.ServiceEndpoint> getAllEndpoints()
           
 Channel getChannel()
           
protected  javax.jbi.servicedesc.ServiceEndpoint getEndpoint(QName serviceName, String endpointName)
           
 int getMaxPendingExchanges()
           
 String getName()
           
 JmsRequestorPool getPool()
           
protected  String getSelector()
           
 void invalidateSelector()
           
 boolean isRollbackOnErrors()
           
 void pause()
           
 void process(Exchange exchange)
          Process the given exchange.
protected  void process(JmsRequestor requestor)
          Process a JMS message
protected  void processExchange(JmsRequestor requestor, Exchange exchange)
          Process a JBI exchange
 void resume()
           
protected  void send(Exchange exchange)
           
 void setChannel(Channel channel)
          Set the channel so that the endpoint can send exchanges back when they are processed or act as a consumer itself.
 void setMaxPendingExchanges(int maxPendingExchanges)
          Specifies the maximum number of pending exchanges on the JMS consumer side.
 void setName(String name)
          A unique name for this cluster endpoint.
 void setPool(JmsRequestorPool pool)
           
 void setRollbackOnErrors(boolean rollbackOnErrors)
           
 void start()
           
protected  Message unmarshallMessage(javax.jms.Message message)
           
 
Methods inherited from class org.apache.servicemix.nmr.core.ServiceRegistryImpl
doRegister, doUnregister, getProperties, getServices, register, unregister
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_MAX_PENDING_EXCHANGES

public static final int DEFAULT_MAX_PENDING_EXCHANGES
Default maximum number of pending exchanges

See Also:
Constant Field Values

JBI_MESSAGE

protected static final String JBI_MESSAGE
Name of the JMS property holding the type of message sent

See Also:
Constant Field Values

JBI_MESSAGE_IN

protected static final int JBI_MESSAGE_IN
The JMS message contains a JBI In message

See Also:
Constant Field Values

JBI_MESSAGE_OUT

protected static final int JBI_MESSAGE_OUT
The JMS message contains a JBI Out message

See Also:
Constant Field Values

JBI_MESSAGE_FAULT

protected static final int JBI_MESSAGE_FAULT
The JMS message contains a JBI Fault message

See Also:
Constant Field Values

JBI_MESSAGE_DONE

protected static final int JBI_MESSAGE_DONE
The JMS message contains a JBI DONE status

See Also:
Constant Field Values

JBI_MESSAGE_ERROR

protected static final int JBI_MESSAGE_ERROR
The JMS message contains a JBI ERROR status

See Also:
Constant Field Values

JBI_MEP

protected static final String JBI_MEP
JMS property holding the Message Exchange Pattern

See Also:
Constant Field Values

JBI_INTERFACE

protected static final String JBI_INTERFACE
JMS property holding the interface QName for the exchange

See Also:
Constant Field Values

JBI_OPERATION

protected static final String JBI_OPERATION
JMS property holding the operation QName for the exchange

See Also:
Constant Field Values

JBI_SERVICE

protected static final String JBI_SERVICE
JMS property holding the service QName for the exchange

See Also:
Constant Field Values

JBI_ENDPOINT

protected static final String JBI_ENDPOINT
JMS property holding the endpoint for the exchange

See Also:
Constant Field Values

PROPERTY_CORR_ID

protected static final String PROPERTY_CORR_ID
JMS property holding the correlation id

See Also:
Constant Field Values

PROPERTY_SENDER_CORR_ID

protected static final String PROPERTY_SENDER_CORR_ID
JMS property holding the correlation id to be used for the reply.

See Also:
Constant Field Values

PROPERTY_ROLLBACK_ON_ERRORS

protected static final String PROPERTY_ROLLBACK_ON_ERRORS
JMS property containing the rollbackOnErrors flag for this message. This property is set on the IN message so that the cluster endpoint consuming this message will handle it with the right behavior in case the cluster endpoint that sent the message has a different configuration.

See Also:
Constant Field Values

PROPERTY_CLUSTER_NAME

protected static final String PROPERTY_CLUSTER_NAME
JMS property holding the name of the cluster (used with selectors)

See Also:
Constant Field Values

PROPERTY_SENDER_CLUSTER_NAME

protected static final String PROPERTY_SENDER_CLUSTER_NAME
JMS property holding the name of the cluster to use for the response

See Also:
Constant Field Values

logger

protected final org.slf4j.Logger logger

rollbackOnErrors

protected boolean rollbackOnErrors

name

protected String name

pool

protected JmsRequestorPool pool

channel

protected Channel channel

started

protected AtomicBoolean started

exchanges

protected final Map<String,Exchange> exchanges

selector

protected String selector

pendingExchanges

protected AtomicInteger pendingExchanges

pauseConsumption

protected AtomicBoolean pauseConsumption

maxPendingExchanges

protected int maxPendingExchanges
Constructor Detail

ClusterEngine

public ClusterEngine()
Method Detail

getChannel

public Channel getChannel()

setChannel

public void setChannel(Channel channel)
Description copied from interface: Endpoint
Set the channel so that the endpoint can send exchanges back when they are processed or act as a consumer itself. This method will be called by the NMR while the endpoint is registered. Such a channel does not need to be closed as the NMR will close it automatically when the endpoint is unregistered.

Specified by:
setChannel in interface Endpoint
Parameters:
channel - the channel that this endpoint can use
See Also:
EndpointRegistry.register(Endpoint, java.util.Map)

getPool

public JmsRequestorPool getPool()

setPool

public void setPool(JmsRequestorPool pool)

isRollbackOnErrors

public boolean isRollbackOnErrors()

setRollbackOnErrors

public void setRollbackOnErrors(boolean rollbackOnErrors)

getName

public String getName()

setName

public void setName(String name)
A unique name for this cluster endpoint.

Parameters:
name -

getMaxPendingExchanges

public int getMaxPendingExchanges()

setMaxPendingExchanges

public void setMaxPendingExchanges(int maxPendingExchanges)
Specifies the maximum number of pending exchanges on the JMS consumer side. This allows to limit the number of JBI exchanges sent into the NMR at a given time. If set to a huge number, the NMR may be flooded by exchanges and run out of memory.

Parameters:
maxPendingExchanges -

afterPropertiesSet

public void afterPropertiesSet()
                        throws Exception
Throws:
Exception

start

public void start()
           throws Exception
Throws:
Exception

destroy

public void destroy()
             throws Exception
Throws:
Exception

pause

public void pause()

resume

public void resume()

endpointRegistered

public void endpointRegistered(InternalEndpoint endpoint)
Description copied from interface: EndpointListener
An endpoint has been registered

Specified by:
endpointRegistered in interface EndpointListener
Parameters:
endpoint - the registered endpoint

endpointUnregistered

public void endpointUnregistered(InternalEndpoint endpoint)
Description copied from interface: EndpointListener
An endpoint has been unregistered

Specified by:
endpointUnregistered in interface EndpointListener
Parameters:
endpoint - the unregistered endpoint

exchangeSent

public void exchangeSent(Exchange exchange)
Description copied from interface: ExchangeListener
Method called each time an exchange is sent

Specified by:
exchangeSent in interface ExchangeListener
Parameters:
exchange - the exchange sent

exchangeDelivered

public void exchangeDelivered(Exchange exchange)
Description copied from interface: ExchangeListener
Method called each time an exchange is delivered

Specified by:
exchangeDelivered in interface ExchangeListener
Parameters:
exchange - the delivered exchange

exchangeFailed

public void exchangeFailed(Exchange exchange)
Description copied from interface: ExchangeListener
Method called when an exchange resulted in an exception to be thrown and the exchange not delivered. This can happen if no endpoint can be found for the target or if something else bad happen.

Specified by:
exchangeFailed in interface ExchangeListener
Parameters:
exchange - the exchange that failed

invalidateSelector

public void invalidateSelector()

getSelector

protected String getSelector()

getEndpoint

protected javax.jbi.servicedesc.ServiceEndpoint getEndpoint(QName serviceName,
                                                            String endpointName)

getAllEndpoints

protected List<javax.jbi.servicedesc.ServiceEndpoint> getAllEndpoints()

done

protected void done(Exchange exchange)

fail

protected void fail(Exchange exchange,
                    Exception e)

send

protected void send(Exchange exchange)

process

public void process(Exchange exchange)
Description copied from interface: Endpoint
Process the given exchange. The processing can occur in the current thread or asynchronously. If an endpoint has sent an exchange asynchronously to another endpoint, it will receive the exchange back using this method. An endpoint can recognized such exchanges by checking the role of the exchange.

Specified by:
process in interface Endpoint
Parameters:
exchange - the exchange to process

process

protected void process(JmsRequestor requestor)
                throws javax.jms.JMSException
Process a JMS message

Parameters:
requestor - the item to use
Throws:
javax.jms.JMSException - if an error occur

processExchange

protected void processExchange(JmsRequestor requestor,
                               Exchange exchange)
                        throws Exception
Process a JBI exchange

Parameters:
requestor - the item to use
exchange - the exchange to process
Throws:
Exception - if an error occur

decrementPendingExchangeIfNeeded

protected void decrementPendingExchangeIfNeeded(Exchange exchange)

unmarshallMessage

protected Message unmarshallMessage(javax.jms.Message message)
                             throws javax.jms.JMSException
Throws:
javax.jms.JMSException


Copyright © 2005-2012 FuseSource. All Rights Reserved.