LibraryLink ToToggle FramesPrintFeedback

Chapter 2. ActiveMQ

The ActiveMQ component allows messages to be sent to a JMS Queue or Topic; or messages to be consumed from a JMS Queue or Topic using Apache ActiveMQ.

This component is based on the JMS Component and uses Spring's JMS support for declarative transactions, using Spring's JmsTemplate for sending and a MessageListenerContainer for consuming. All the options from the JMS component also apply for this component.

To use this component, make sure you have the activemq.jar or activemq-core.jar on your classpath along with any Apache Camel dependencies such as camel-core.jar, camel-spring.jar and camel-jms.jar.

activemq:[queue:|topic:]destinationName

Where destinationName is an ActiveMQ queue or topic name. By default, the destinationName is interpreted as a queue name. For example, to connect to the queue, FOO.BAR, use:

activemq:FOO.BAR

You can include the optional queue: prefix, if you prefer:

activemq:queue:FOO.BAR

To connect to a topic, you must include the topic: prefix. For example, to connect to the topic, Stocks.Prices, use:

activemq:topic:Stocks.Prices

See Options on the JMS component as all these options also apply for this component.

The following test case shows how to add an ActiveMQComponent to the CamelContext using the activeMQComponent() method while specifying the brokerURL used to connect to ActiveMQ

camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));

You can configure the ActiveMQ broker URL on the ActiveMQComponent as follows

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
  </camelContext>


  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://somehost:61616"/>
  </bean>

</beans>

When sending to an ActiveMQ broker using Camel it's recommended to use a pooled connection factory to handle efficient pooling of JMS connections, sessions and producers. This is documented in the page ActiveMQ Spring Support.

You can grab Jencks AMQ pool with maven:

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.3.2</version>
    </dependency>

And then setup the activemq component as follows:

     <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="maxConnections" value="8" />
        <property name="maximumActive" value="500" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>


    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="transacted" value="false"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>

    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>

The ActiveMQ component also provides a helper Type Converter from a JMS MessageListener to a Processor. This means that the Bean component is capable of invoking any JMS MessageListener bean directly inside any route.

So for example you can create a MessageListener in JMS as follows:

public class MyListener implements MessageListener {
   public void onMessage(Message jmsMessage) {
       // ...
   }
}

Then use it in your route as follows

from("file://foo/bar").
  bean(MyListener.class);

That is, you can reuse any of the Apache Camel Components and easily integrate them into your JMS MessageListener POJO\!

ActiveMQ can generates Advisory messages which are put in topics that you can consume. Such messages can help you to send alarm in case you detect slow consumers or to build statistics (nber of messages/produced per day, ...). The following Spring DSL example shows you how to read messages from Connection Topic :

The route start by reading the topic : ActiveMQ.Advisory.Connection. To watch another topic, simply change the name according to the name provided in ActiveMQ Advisory Messages documentation. The parameter mapJmsMessage=false allows to convert the org.apache.activemq.command.ActiveMqMessage object from the jms queue. Next, the body received is converted into a String for the example purpose and a carriage return is added. Finally, the string is added to a file

<route>
	<from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" />
	<convertBodyTo type="java.lang.String"/>
	<transform>
	     <simple>${in.body}&#13;</simple>
	</transform>
	<to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" />
</route>

If you produce consume a message on a queue, you should see the following files under data/activemq folder :

advisoryConnection-20100312.txt advisoryProducer-20100312.txt

and containing string :

ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140-1:0:0:0:221, originalDestination = null, 
originalTransactionId = null, producerId = ID:dell-charles-3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, 
transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468, correlationId = null,
 replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, 
 userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = 
 ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50, clientId = 
 ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true,
 clientMaster = true}, redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles-3258-1268399815140-0:0,
  originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true, droppable = false}

You need these dependencies

You must have the camel-jms as dependency as ActiveMQ is an extension to the JMS component.

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-jms</artifactId>
  <version>1.6.0</version>
</dependency>

The ActiveMQ component is released with the ActiveMQ project itself. For Maven 2 users you just need to add the following dependency to your project.

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-camel</artifactId>
  <version>5.2.0</version>
</dependency>

For 5.1.0 its in the activemq-core library

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>5.1.0</version>
</dependency>

Alternatively you can download the component JAR file directly from the Maven repository:

For this version you must use the JMS component instead. Please be careful to use a pooling connection factory as described in the JmsTemplate Gotchas