Index: activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java =================================================================== --- activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (revision 682804) +++ activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (working copy) @@ -378,9 +378,9 @@ } } if (callDispatchMatched && destination != null) { - if (destination.isLazyDispatch()) { +// if (destination.isLazyDispatch()) { destination.wakeup(); - } +// } dispatchPending(); } else { if (isSlave()) { Index: activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java =================================================================== --- activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (revision 682804) +++ activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (working copy) @@ -81,6 +81,8 @@ protected final List consumers = new ArrayList(50); protected PendingMessageCursor messages; private final LinkedHashMap pagedInMessages = new LinkedHashMap(); + // Messages that are paged in but have not yet been targeted at a subscription + private List pagedInPendingDispatch = new ArrayList(100); private MessageGroupMap messageGroupOwners; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); @@ -317,6 +319,7 @@ } public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { +// System.out.println(getName()+" send "+message.getMessageId()); final ConnectionContext context = producerExchange.getConnectionContext(); // There is delay between the client sending it and it arriving at the // destination.. it may have expired. @@ -946,6 +949,18 @@ result = !messages.isEmpty(); } + // Kinda ugly.. but I think dispatchLock is the only mutex protecting the + // pagedInPendingDispatch variable. + dispatchLock.lock(); + try { + result |= !pagedInPendingDispatch.isEmpty(); + } finally { + dispatchLock.unlock(); + } + + // Perhaps we should page always into the pagedInPendingDispatch list is + // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty() + // then we do a dispatch. if (result) { try { pageInMessages(false); @@ -1134,58 +1149,76 @@ } private void doDispatch(List list) throws Exception { - if (list != null) { - List consumers; - dispatchLock.lock(); - try { - synchronized (this.consumers) { - consumers = new ArrayList(this.consumers); + dispatchLock.lock(); + try { + if(!pagedInPendingDispatch.isEmpty()) { +// System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size()); + // Try to first dispatch anything that had not been dispatched before. + pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); +// System.out.println(getName()+": new pending list1: "+pagedInPendingDispatch.size()); + } + // and now see if we can dispatch the new stuff.. and append to the pending + // list anything that does not actually get dispatched. + if (list != null && !list.isEmpty()) { +// System.out.println(getName()+": dispatching from paged in: "+list.size()); + pagedInPendingDispatch.addAll(doActualDispatch(list)); +// System.out.println(getName()+": new pending list2: "+pagedInPendingDispatch.size()); + } + } finally { + dispatchLock.unlock(); + } + } + + /** + * @return list of messages that could get dispatched to consumers if they were not full. + */ + private List doActualDispatch(List list) throws Exception { + List rc = new ArrayList(list.size()); + List consumers; + + synchronized (this.consumers) { + consumers = new ArrayList(this.consumers); + } + + for (MessageReference node : list) { + Subscription target = null; + int interestCount=0; + for (Subscription s : consumers) { + if (dispatchSelector.canSelect(s, node)) { + if (!s.isFull()) { + // Dispatch it. + s.add(node); +// System.out.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId()); + target = s; + break; + } + interestCount++; } + } - - for (MessageReference node : list) { - Subscription target = null; - List targets = null; - for (Subscription s : consumers) { - if (dispatchSelector.canSelect(s, node)) { - if (!s.isFull()) { - s.add(node); - target = s; - break; - } else { - if (targets == null) { - targets = new ArrayList(); - } - targets.add(s); - } - } + if (target == null && interestCount>0) { + // This means all subs were full... + rc.add((QueueMessageReference)node); + } + + // If it got dispatched, rotate the consumer list to get round robin distribution. + if (target != null && !strictOrderDispatch && consumers.size() > 1 && + !dispatchSelector.isExclusiveConsumer(target)) { + synchronized (this.consumers) { + if( removeFromConsumerList(target) ) { + addToConsumerList(target); + consumers = new ArrayList(this.consumers); } - if (target == null && targets != null) { - // pick the least loaded to add the message too - for (Subscription s : targets) { - if (target == null - || target.getPendingQueueSize() > s.getPendingQueueSize()) { - target = s; - } - } - if (target != null) { - target.add(node); - } - } - if (target != null && !strictOrderDispatch && consumers.size() > 1 && - !dispatchSelector.isExclusiveConsumer(target)) { - synchronized (this.consumers) { - if( removeFromConsumerList(target) ) { - addToConsumerList(target); - consumers = new ArrayList(this.consumers); - } - } - } } - } finally { - dispatchLock.unlock(); } } + + LOG.info(getName()+" Pending messages:"); + for (MessageReference n : rc) { + LOG.info(getName()+" - " + n.getMessageId()); + } + + return rc; } private void pageInMessages() throws Exception {