I am using ServiceMix and I am attempting to use XA transactions correctly. Basically, I want my route wrapped in an XA transaction in a way that ActiveMQ and Microsoft SqlServer persistance is atomic.
Ideally, I want all transactions that fail X amount of times to move into a Dead Letter Queue for later processing.
Currently for the class "ServiceMixXaRollbackTest" below...
If a transactions is to fail more than 2 times it will move to the Dead Letter Queue: "DLQ.my_test_thirdparty".
However if I comment out the code:
onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to( "activemq:queue:DLQ.my_test_thirdparty" );
Then the JMS XML message that has been dropped into "my_test_thirdparty" will get consumed and disappear after 4 retries.
Is the way in which I undertake my Camel Error Handling correct? Any help would be much appreciated.
Regards,
Mark
Below are some of the articles that may be related.
http://servicemix.396122.n5.nabble.com/smx4-camel2-2-transactional-error-handling-td420449.html
http://camel.465427.n5.nabble.com/Transaction-Error-Handler-with-Dead-Letter-Channel-td3232320.html
http://tmielke.blogspot.com/2011/07/error-handling-in-camel-for-jms.html
http://weblog.plexobject.com/?p=1672
http://camel.apache.org/transactionerrorhandler.html
http://camel.apache.org/transactional-client.html
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
- I have a one "src/main/resources/META-INF/spring/resource-context.xml" file:
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0"
xmlns:broker="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://servicemix.apache.org/file/1.0 http://servicemix.apache.org/file/1.0/servicemix-file.xsd
http://servicemix.apache.org/cxfse/1.0 http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean id="AtomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<property name="forceShutdown" value="false" />
</bean>
<bean id="AtomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="1000" />
</bean>
<bean id="JtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="AtomikosTransactionManager" />
<property name="userTransaction" ref="AtomikosUserTransaction" />
</bean>
<bean id="activemq" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="ConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="amq1" />
<property name="xaConnectionFactory" ref="activemq" />
</bean>
<bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean">
<property name="uniqueResourceName" value="ds1" />
<property name="xaDataSource" ref="dataSourceRaw" />
<property name="testQuery" value="select 1" />
</bean>
<bean id="dataSourceRaw" class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource">
<property name="serverName" value="localhost" />
<property name="portNumber" value="1433" />
<property name="selectMethod" value="cursor" />
<property name="databaseName" value="
theDatabaseName" />
<property name="user" value="
theUserName" />
<property name="password" value="
thePassword" />
</bean>
</beans>
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
- I have one properties file "src/main/resouces/jta.properties" file:
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
com.atomikos.icatch.console_file_name = tm-dev.out
com.atomikos.icatch.log_base_name = tmlog-dev
com.atomikos.icatch.tm_unique_name = tmdev
com.atomikos.icatch.serial_jta_transactions=false
com.atomikos.icatch.automatic_resource_registration=true
com.atomikos.icatch.max_actives=15000
com.atomikos.icatch.max_timeout=3600000
com.atomikos.icatch.output_dir=atomikosxatm/
com.atomikos.icatch.log_base_dir=atomikosxatm/
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
- I use the embedded ServiceMix 4.3 ActiveMq instance, otherwise you could use the following file "src/test/resources/broker-context.xml":
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0"
xmlns:broker="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://servicemix.apache.org/file/1.0 http://servicemix.apache.org/file/1.0/servicemix-file.xsd
http://servicemix.apache.org/cxfse/1.0 http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- setup a local JMS Broker for testing purpose -->
<broker:broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost">
<broker:transportConnectors>
<broker:transportConnector uri="tcp://localhost:61616"/>
</broker:transportConnectors>
</broker:broker>
</beans>
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
- I have one java test class to test successful XA Commit.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
package test.xa;
import javax.sql.DataSource;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.language.XPath;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.spring.SpringRouteBuilder;
import org.apache.camel.spring.spi.TransactionErrorHandlerBuilder;
import org.apache.camel.test.CamelSpringTestSupport;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class ServiceMixXaCommitTest extends CamelSpringTestSupport
{
protected JdbcTemplate jdbc;
@BeforeMethod
public void setupDatabase() throws Exception {
super.setUp();
DataSource ds = context.getRegistry().lookup("dataSourceRaw", DataSource.class);
jdbc = new JdbcTemplate(ds);
// jdbc = context.getRegistry().lookup("jdbc", JdbcTemplate.class);
try
{
jdbc.execute( "drop table messaging.my_test_thirdparty" );
}
catch ( Exception e )
{
// ignore
}
jdbc.execute("create table messaging.my_test_thirdparty ( thirdparty_id varchar(10), name varchar(128), created varchar(20), status_code varchar(3) )");
}
@AfterMethod
public void restoreDatabase() throws Exception {
jdbc.execute("drop table messaging.my_test_thirdparty");
}
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
ActiveMQXAConnectionFactory connectionFactory = applicationContext.getBean("activemq", ActiveMQXAConnectionFactory.class);
camelContext.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
// DataSource dataSource = applicationContext.getBean("dataSource", DataSource.class);
//
// JdbcComponent jdbcComponent = new JdbcComponent();
// jdbcComponent.setDataSource(dataSource);
// camelContext.addComponent("jdbc", jdbcComponent);
camelContext.addRoutes( createRouteBuilder() );
return camelContext;
}
@Override
protected AbstractXmlApplicationContext createApplicationContext() {
return new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/resource-context.xml"});
}
@Test
public void testXaRollbackAfterDb() throws Exception {
// This database table should be empty
Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 0);
String xml = "<?xml version=\"1.0\"?><thirdparty id=\"123\"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty>";
template.sendBody("activemq:queue:my_test_thirdparty", xml);
// Wait for route to fail
Thread.sleep(15000);
// There should be 1 row in the database
Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 1);
// Check ActiveMq to ensure final state
String dlq = consumer.receiveBodyNoWait("activemq:queue:DLQ.my_test_thirdparty", String.class);
Assert.assertNull(dlq, "Should not find message message");
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new SpringRouteBuilder() {
@Override
public void configure() throws Exception {
// Non-transactional dead letter queue.
// errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ").maximumRedeliveries(2).redeliveryDelay(500));
ErrorHandlerBuilder errorHandlerBuilder = transactionErrorHandler();
RedeliveryPolicy redeliveryPolicy = ((TransactionErrorHandlerBuilder)errorHandlerBuilder).getRedeliveryPolicy();
redeliveryPolicy.setRedeliveryDelay( 500 );
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveryDelay( 30 * 60 * 1000 ); // Max = 30 minutes
redeliveryPolicy.setMaximumRedeliveries(4);
errorHandler(errorHandlerBuilder);
// Without the below onException call, the JMS Queue Message gets consumed and disappears.
onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to( "activemq:queue:DLQ.my_test_thirdparty" );
from("activemq:queue:my_test_thirdparty")
.transacted()
.log("
+ Before Database Call
+")
.bean(ServiceMixXaCommitTest.class, "toSql")
.to("jdbc:dataSource")
.log("
+ After Database Call
+")
;
}
};
}
/*
* <?xml version="1.0"?><thirdparty id="123"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty>
*
*/
public static String toSql(@XPath("thirdparty/@id") int thirdpartyId,
@XPath("thirdparty/name/text()") String name,
@XPath("thirdparty/date/text()") long created,
@XPath("thirdparty/code/text()") int status_code) {
if (thirdpartyId <= 0) {
throw new IllegalArgumentException("ThirdPartyId is invalid, was " + thirdpartyId);
}
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO messaging.my_test_thirdparty (thirdparty_id, name, created, status_code) VALUES (");
sb.append("'").append(thirdpartyId).append("', ");
sb.append("'").append(name).append("', ");
sb.append("'").append(created).append("', ");
sb.append("'").append(status_code).append("') ");
return sb.toString();
}
}
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
- I have one java test class to test successful XA Rollback.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
package test.xa;
import javax.sql.DataSource;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.language.XPath;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.spring.SpringRouteBuilder;
import org.apache.camel.spring.spi.TransactionErrorHandlerBuilder;
import org.apache.camel.test.CamelSpringTestSupport;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class ServiceMixXaRollbackTest extends CamelSpringTestSupport
{
protected JdbcTemplate jdbc;
@BeforeMethod
public void setupDatabase() throws Exception {
super.setUp();
DataSource ds = context.getRegistry().lookup("dataSourceRaw", DataSource.class);
jdbc = new JdbcTemplate(ds);
// jdbc = context.getRegistry().lookup("jdbc", JdbcTemplate.class);
try
{
jdbc.execute( "drop table messaging.my_test_thirdparty" );
}
catch ( Exception e )
{
// ignore
}
jdbc.execute("create table messaging.my_test_thirdparty ( thirdparty_id varchar(10), name varchar(128), created varchar(20), status_code varchar(3) )");
}
@AfterMethod
public void restoreDatabase() throws Exception {
jdbc.execute("drop table messaging.my_test_thirdparty");
}
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
ActiveMQXAConnectionFactory connectionFactory = applicationContext.getBean("activemq", ActiveMQXAConnectionFactory.class);
camelContext.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
// DataSource dataSource = applicationContext.getBean("dataSource", DataSource.class);
//
// JdbcComponent jdbcComponent = new JdbcComponent();
// jdbcComponent.setDataSource(dataSource);
// camelContext.addComponent("jdbc", jdbcComponent);
camelContext.addRoutes( createRouteBuilder() );
return camelContext;
}
@Override
protected AbstractXmlApplicationContext createApplicationContext() {
return new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/resource-context.xml"});
}
@Test
public void testXaRollbackAfterDb() throws Exception {
// This database table should be empty
Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 0);
String xml = "<?xml version=\"1.0\"?><thirdparty id=\"123\"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty>";
template.sendBody("activemq:queue:my_test_thirdparty", xml);
// Wait for route to fail
Thread.sleep(15000);
// The database should NOT have any new rows inserted to it.
Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 0);
// Check ActiveMq to ensure final state
String dlq = consumer.receiveBodyNoWait("activemq:queue:DLQ.my_test_thirdparty", String.class);
Assert.assertNotNull(dlq, "Should not lose message");
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new SpringRouteBuilder() {
@Override
public void configure() throws Exception {
// Non-transactional dead letter queue.
// errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ").maximumRedeliveries(2).redeliveryDelay(500));
ErrorHandlerBuilder errorHandlerBuilder = transactionErrorHandler();
RedeliveryPolicy redeliveryPolicy = ((TransactionErrorHandlerBuilder)errorHandlerBuilder).getRedeliveryPolicy();
redeliveryPolicy.setRedeliveryDelay( 500 );
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveryDelay( 30 * 60 * 1000 ); // Max = 30 minutes
redeliveryPolicy.setMaximumRedeliveries(4);
errorHandler(errorHandlerBuilder);
// Without the below onException call, the JMS Queue Message gets consumed and disappears.
onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to( "activemq:queue:DLQ.my_test_thirdparty" );
from("activemq:queue:my_test_thirdparty")
.transacted()
.log("
+ Before Database Call
+")
.bean(ServiceMixXaRollbackTest.class, "toSql")
.to("jdbc:dataSource")
.log("
+ After Database Call
+")
.throwException(new IllegalArgumentException("Unexpected Exception"))
;
}
};
}
/*
* <?xml version="1.0"?><thirdparty id="123"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty>
*
*/
public static String toSql(@XPath("thirdparty/@id") int thirdpartyId,
@XPath("thirdparty/name/text()") String name,
@XPath("thirdparty/date/text()") long created,
@XPath("thirdparty/code/text()") int status_code) {
if (thirdpartyId <= 0) {
throw new IllegalArgumentException("ThirdPartyId is invalid, was " + thirdpartyId);
}
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO messaging.my_test_thirdparty (thirdparty_id, name, created, status_code) VALUES (");
sb.append("'").append(thirdpartyId).append("', ");
sb.append("'").append(name).append("', ");
sb.append("'").append(created).append("', ");
sb.append("'").append(status_code).append("') ");
return sb.toString();
}
}