Effective October 27, 2012, online and email support for FuseSource products will move to Red Hat support channels. For more information, please see the JIRA Migration to Red Hat FAQ.
As of October 27th, please open all new issues in the Red Hat Customer Portal .
Issue Details (XML | Word | Printable)

Key: MB-853
Type: Improvement Improvement
Status: Resolved Resolved
Resolution: Fixed
Priority: Major Major
Assignee: Dejan Bosanac
Reporter: Susan Javurek
Votes: 0
Watchers: 0
Operations

If you were logged in you would be able to see more operations.
FUSE Message Broker

Under a heavy load some connections are rejected when the Kahadb ThreadPoolExecutor cannot accept more tasks for execution.

Created: 06/Apr/11 10:19 AM   Updated: 10/May/11 10:16 AM
Component/s: broker
Affects Version/s: 5.4.2-fuse-02-00
Fix Version/s: 5.5.0-fuse-00-00

External Issue URL: https://issues.apache.org/jira/browse/AMQ-3272


 Description  « Hide
Hi,

When this scneario occurs, my broker throws the following error:

2011-03-04 09:46:55,295 [127.0.0.1:30197] WARN ProtocolConverter - Exception occured processing:
SEND
receipt:xxx
destination:/queue/loadq-9
persistent:true

Message from producer 9
yzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghi
jklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrst
uvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcd
efghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmno
pqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxy
zabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghij
klmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstu
vwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcde
fghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghij

java.util.concurrent.RejectedExecutionException
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
at org.apache.activemq.store.kahadb.KahaDBStore.addQueueTask(KahaDBStore.java:262)
at org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore.asyncAddQueueMessage(KahaDBStore.java:318)
at org.apache.activemq.store.kahadb.KahaDBTransactionStore.asyncAddQueueMessage(KahaDBTransactionStore.java:374)
at org.apache.activemq.store.kahadb.KahaDBTransactionStore$1.asyncAddQueueMessage(KahaDBTransactionStore.java:161)
at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:671)
at org.apache.activemq.broker.region.Queue.send(Queue.java:644)
at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:520)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:130)
at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:130)
at org.apache.activemq.broker.UserIDBroker.send(UserIDBroker.java:56)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:130)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:130)
at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:192)
at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:461)
at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:677)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:310)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:184)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:81)
at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:140)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:253)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:178)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:70)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:222)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204)
at java.lang.Thread.run(Thread.java:619)

Unfortunately, I can not reproduce this error, however, based on internal conversations we'd like to add an improvement to add a reject execution handler that will block
if there's no space in the pool (instead of rejecting it) and thus slow down the broker in this scenario.



 All   Comments   Change History      Sort Order: Ascending order - Click to sort in descending order
Dejan Bosanac added a comment - 11/Apr/11 08:38 AM
Hi,

we found a problem with semaphore locking of task executor, which we believe could be the reason RejectedExecutionExecption would occur.

This is fixed now and available for testing in the latest (20110408.180305-10) snapshot from

http://repo.fusesource.com/nexus/content/repositories/snapshots/org/apache/activemq/apache-activemq/5.5-fuse-SNAPSHOT/