Forum Home » Fuse Distributions » Fuse ESB

Thread: JMS and Database interactions under the same transactional context

 
This question is answered. Helpful answers available: 2. Correct answers available: 1.


Permlink Replies: 17 - Last Post: Feb 2, 2012 6:04 PM Last Post By: davsclaus
marks1900

Posts: 12
Registered: 09/01/11
JMS and Database interactions under the same transactional context
Posted: Sep 1, 2011 4:35 PM
 
  Click to reply to this thread Reply
I am currently using ServiceMix 4.3 out-of-the-box and I am hoping someone can give me a straightforward concrete example of how I would wrap a process that includes JMS and Database interactions under the same transactional context?

I have looked at the following resources, though I am looking for more of a working example.

http://fusesource.com/docs/esb/4.2/deploy_osgi/ESBOSGiIntro-Txn.html
http://fusesource.com/docs/esb/4.4/esb_deploy_osgi/ESBOSGiIntro-Txn.html

Also, is there a best practice to configure and use an XA Transactions that spans a JMS resource and a Database resource?

marks1900

Posts: 12
Registered: 09/01/11
Re: JMS and Database interactions under the same transactional context
Posted: Sep 2, 2011 2:06 PM   in response to: marks1900 in response to: marks1900
 
  Click to reply to this thread Reply
My specific environment configuration is that I have Microsoft SQL Server 2008 and ActiveMq.

Does anyone have an XA examples? Any information, tutorial will be greatly appreciated.
davsclaus

Posts: 1,893
Registered: 10/14/08
Re: JMS and Database interactions under the same transactional context
Posted: Sep 6, 2011 4:15 AM   in response to: marks1900 in response to: marks1900
 
  Click to reply to this thread Reply
There is also a Camel transaction guide
http://fusesource.com/require_registration_for?url=http://fusesource.com/docs/router/2.7/transactions/index.html

The Camel in Action book, chapter 9 convers transactions as well, which also includes XA.
http://fusesource.com/fuse/apache-books/
marks1900

Posts: 12
Registered: 09/01/11
Re: JMS and Database interactions under the same transactional context
Posted: Sep 8, 2011 3:21 PM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
Thanks for that reply.

I ended up finding the example that goes with that book:

http://camelinaction.googlecode.com/svn/trunk/chapter9/xa/src/test/resources/spring-context.xml

Ideally, I am looking for an example that combines the following resources in blueprint or spring configuration file:

<bean id="myDataSource" class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource">

<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
marks1900

Posts: 12
Registered: 09/01/11
Re: JMS and Database interactions under the same transactional context
Posted: Oct 17, 2011 10:07 PM   in response to: marks1900 in response to: marks1900
 
  Click to reply to this thread Reply
I am using ServiceMix and I am attempting to use XA transactions correctly. Basically, I want my route wrapped in an XA transaction in a way that ActiveMQ and Microsoft SqlServer persistance is atomic.

Ideally, I want all transactions that fail X amount of times to move into a Dead Letter Queue for later processing.



Currently for the class "ServiceMixXaRollbackTest" below...

If a transactions is to fail more than 2 times it will move to the Dead Letter Queue: "DLQ.my_test_thirdparty".

However if I comment out the code:

onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to( "activemq:queue:DLQ.my_test_thirdparty" );

Then the JMS XML message that has been dropped into "my_test_thirdparty" will get consumed and disappear after 4 retries.



Is the way in which I undertake my Camel Error Handling correct? Any help would be much appreciated.


Regards,
Mark



Below are some of the articles that may be related.

http://servicemix.396122.n5.nabble.com/smx4-camel2-2-transactional-error-handling-td420449.html
http://camel.465427.n5.nabble.com/Transaction-Error-Handler-with-Dead-Letter-Channel-td3232320.html
http://tmielke.blogspot.com/2011/07/error-handling-in-camel-for-jms.html
http://weblog.plexobject.com/?p=1672
http://camel.apache.org/transactionerrorhandler.html
http://camel.apache.org/transactional-client.html


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

  • I have a one "src/main/resources/META-INF/spring/resource-context.xml" file:
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0"
xmlns:broker="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://servicemix.apache.org/file/1.0 http://servicemix.apache.org/file/1.0/servicemix-file.xsd
http://servicemix.apache.org/cxfse/1.0 http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<bean id="AtomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<property name="forceShutdown" value="false" />
</bean>

<bean id="AtomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="1000" />
</bean>

<bean id="JtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="AtomikosTransactionManager" />
<property name="userTransaction" ref="AtomikosUserTransaction" />
</bean>

<bean id="activemq" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>

<bean id="ConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="amq1" />
<property name="xaConnectionFactory" ref="activemq" />
</bean>

<bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean">
<property name="uniqueResourceName" value="ds1" />
<property name="xaDataSource" ref="dataSourceRaw" />
<property name="testQuery" value="select 1" />
</bean>

<bean id="dataSourceRaw" class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource">
<property name="serverName" value="localhost" />
<property name="portNumber" value="1433" />
<property name="selectMethod" value="cursor" />
<property name="databaseName" value="theDatabaseName" />
<property name="user" value="theUserName" />
<property name="password" value="thePassword" />
</bean>

</beans>

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

  • I have one properties file "src/main/resouces/jta.properties" file:
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
com.atomikos.icatch.console_file_name = tm-dev.out
com.atomikos.icatch.log_base_name = tmlog-dev
com.atomikos.icatch.tm_unique_name = tmdev
com.atomikos.icatch.serial_jta_transactions=false
com.atomikos.icatch.automatic_resource_registration=true
com.atomikos.icatch.max_actives=15000
com.atomikos.icatch.max_timeout=3600000
com.atomikos.icatch.output_dir=atomikosxatm/
com.atomikos.icatch.log_base_dir=atomikosxatm/

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

  • I use the embedded ServiceMix 4.3 ActiveMq instance, otherwise you could use the following file "src/test/resources/broker-context.xml":
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0"
xmlns:broker="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://servicemix.apache.org/file/1.0 http://servicemix.apache.org/file/1.0/servicemix-file.xsd
http://servicemix.apache.org/cxfse/1.0 http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<!-- setup a local JMS Broker for testing purpose -->
<broker:broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost">
<broker:transportConnectors>
<broker:transportConnector uri="tcp://localhost:61616"/>
</broker:transportConnectors>
</broker:broker>

</beans>

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

  • I have one java test class to test successful XA Commit.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

package test.xa;

import javax.sql.DataSource;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.language.XPath;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.spring.SpringRouteBuilder;
import org.apache.camel.spring.spi.TransactionErrorHandlerBuilder;
import org.apache.camel.test.CamelSpringTestSupport;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ServiceMixXaCommitTest extends CamelSpringTestSupport
{
protected JdbcTemplate jdbc;

@BeforeMethod
public void setupDatabase() throws Exception {
super.setUp();
DataSource ds = context.getRegistry().lookup("dataSourceRaw", DataSource.class);
jdbc = new JdbcTemplate(ds);

// jdbc = context.getRegistry().lookup("jdbc", JdbcTemplate.class);
try
{
jdbc.execute( "drop table messaging.my_test_thirdparty" );
}
catch ( Exception e )
{
// ignore
}
jdbc.execute("create table messaging.my_test_thirdparty ( thirdparty_id varchar(10), name varchar(128), created varchar(20), status_code varchar(3) )");
}

@AfterMethod
public void restoreDatabase() throws Exception {
jdbc.execute("drop table messaging.my_test_thirdparty");
}

@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();

ActiveMQXAConnectionFactory connectionFactory = applicationContext.getBean("activemq", ActiveMQXAConnectionFactory.class);
camelContext.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

// DataSource dataSource = applicationContext.getBean("dataSource", DataSource.class);
//
// JdbcComponent jdbcComponent = new JdbcComponent();
// jdbcComponent.setDataSource(dataSource);
// camelContext.addComponent("jdbc", jdbcComponent);

camelContext.addRoutes( createRouteBuilder() );

return camelContext;
}


@Override
protected AbstractXmlApplicationContext createApplicationContext() {
return new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/resource-context.xml"});
}

@Test
public void testXaRollbackAfterDb() throws Exception {

// This database table should be empty
Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 0);

String xml = "<?xml version=\"1.0\"?><thirdparty id=\"123\"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty>";
template.sendBody("activemq:queue:my_test_thirdparty", xml);

// Wait for route to fail
Thread.sleep(15000);

// There should be 1 row in the database
Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 1);

// Check ActiveMq to ensure final state
String dlq = consumer.receiveBodyNoWait("activemq:queue:DLQ.my_test_thirdparty", String.class);
Assert.assertNull(dlq, "Should not find message message");

}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new SpringRouteBuilder() {

@Override
public void configure() throws Exception {

// Non-transactional dead letter queue.
// errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ").maximumRedeliveries(2).redeliveryDelay(500));

ErrorHandlerBuilder errorHandlerBuilder = transactionErrorHandler();

RedeliveryPolicy redeliveryPolicy = ((TransactionErrorHandlerBuilder)errorHandlerBuilder).getRedeliveryPolicy();
redeliveryPolicy.setRedeliveryDelay( 500 );
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveryDelay( 30 * 60 * 1000 ); // Max = 30 minutes
redeliveryPolicy.setMaximumRedeliveries(4);

errorHandler(errorHandlerBuilder);

// Without the below onException call, the JMS Queue Message gets consumed and disappears.
onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to( "activemq:queue:DLQ.my_test_thirdparty" );

from("activemq:queue:my_test_thirdparty")
.transacted()
.log("+ Before Database Call +")
.bean(ServiceMixXaCommitTest.class, "toSql")
.to("jdbc:dataSource")
.log("+ After Database Call +")
;

}

};
}

/*
* <?xml version="1.0"?><thirdparty id="123"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty>
*
*/
public static String toSql(@XPath("thirdparty/@id") int thirdpartyId,
@XPath("thirdparty/name/text()") String name,
@XPath("thirdparty/date/text()") long created,
@XPath("thirdparty/code/text()") int status_code) {

if (thirdpartyId <= 0) {
throw new IllegalArgumentException("ThirdPartyId is invalid, was " + thirdpartyId);
}

StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO messaging.my_test_thirdparty (thirdparty_id, name, created, status_code) VALUES (");
sb.append("'").append(thirdpartyId).append("', ");
sb.append("'").append(name).append("', ");
sb.append("'").append(created).append("', ");
sb.append("'").append(status_code).append("') ");

return sb.toString();
}

}

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

  • I have one java test class to test successful XA Rollback.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

package test.xa;

import javax.sql.DataSource;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.language.XPath;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.spring.SpringRouteBuilder;
import org.apache.camel.spring.spi.TransactionErrorHandlerBuilder;
import org.apache.camel.test.CamelSpringTestSupport;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ServiceMixXaRollbackTest extends CamelSpringTestSupport
{
protected JdbcTemplate jdbc;

@BeforeMethod
public void setupDatabase() throws Exception {
super.setUp();
DataSource ds = context.getRegistry().lookup("dataSourceRaw", DataSource.class);
jdbc = new JdbcTemplate(ds);

// jdbc = context.getRegistry().lookup("jdbc", JdbcTemplate.class);

try
{
jdbc.execute( "drop table messaging.my_test_thirdparty" );
}
catch ( Exception e )
{
// ignore
}
jdbc.execute("create table messaging.my_test_thirdparty ( thirdparty_id varchar(10), name varchar(128), created varchar(20), status_code varchar(3) )");
}

@AfterMethod
public void restoreDatabase() throws Exception {
jdbc.execute("drop table messaging.my_test_thirdparty");
}

@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();

ActiveMQXAConnectionFactory connectionFactory = applicationContext.getBean("activemq", ActiveMQXAConnectionFactory.class);
camelContext.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

// DataSource dataSource = applicationContext.getBean("dataSource", DataSource.class);
//
// JdbcComponent jdbcComponent = new JdbcComponent();
// jdbcComponent.setDataSource(dataSource);
// camelContext.addComponent("jdbc", jdbcComponent);

camelContext.addRoutes( createRouteBuilder() );

return camelContext;
}


@Override
protected AbstractXmlApplicationContext createApplicationContext() {
return new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/resource-context.xml"});
}

@Test
public void testXaRollbackAfterDb() throws Exception {

// This database table should be empty
Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 0);

String xml = "<?xml version=\"1.0\"?><thirdparty id=\"123\"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty>";
template.sendBody("activemq:queue:my_test_thirdparty", xml);

// Wait for route to fail
Thread.sleep(15000);

// The database should NOT have any new rows inserted to it.
Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 0);

// Check ActiveMq to ensure final state
String dlq = consumer.receiveBodyNoWait("activemq:queue:DLQ.my_test_thirdparty", String.class);
Assert.assertNotNull(dlq, "Should not lose message");
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new SpringRouteBuilder() {

@Override
public void configure() throws Exception {

// Non-transactional dead letter queue.
// errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ").maximumRedeliveries(2).redeliveryDelay(500));

ErrorHandlerBuilder errorHandlerBuilder = transactionErrorHandler();

RedeliveryPolicy redeliveryPolicy = ((TransactionErrorHandlerBuilder)errorHandlerBuilder).getRedeliveryPolicy();
redeliveryPolicy.setRedeliveryDelay( 500 );
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveryDelay( 30 * 60 * 1000 ); // Max = 30 minutes
redeliveryPolicy.setMaximumRedeliveries(4);

errorHandler(errorHandlerBuilder);

// Without the below onException call, the JMS Queue Message gets consumed and disappears.
onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to( "activemq:queue:DLQ.my_test_thirdparty" );

from("activemq:queue:my_test_thirdparty")
.transacted()
.log("+ Before Database Call +")
.bean(ServiceMixXaRollbackTest.class, "toSql")
.to("jdbc:dataSource")
.log("+ After Database Call +")
.throwException(new IllegalArgumentException("Unexpected Exception"))
;

}

};
}

/*
* <?xml version="1.0"?><thirdparty id="123"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty>
*
*/
public static String toSql(@XPath("thirdparty/@id") int thirdpartyId,
@XPath("thirdparty/name/text()") String name,
@XPath("thirdparty/date/text()") long created,
@XPath("thirdparty/code/text()") int status_code) {

if (thirdpartyId <= 0) {
throw new IllegalArgumentException("ThirdPartyId is invalid, was " + thirdpartyId);
}

StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO messaging.my_test_thirdparty (thirdparty_id, name, created, status_code) VALUES (");
sb.append("'").append(thirdpartyId).append("', ");
sb.append("'").append(name).append("', ");
sb.append("'").append(created).append("', ");
sb.append("'").append(status_code).append("') ");

return sb.toString();
}

}

davsclaus

Posts: 1,893
Registered: 10/14/08
Re: JMS and Database interactions under the same transactional context
Posted: Oct 18, 2011 6:24 AM   in response to: marks1900 in response to: marks1900
 
  Click to reply to this thread Reply
Hi

You may want to checkout 2-part webinar that Charles Moulliard is to give the next 2 months about using database and transactions with Fuse ESB.

See details at
http://fusesource.com/resources/video-archived-webinars/
davsclaus

Posts: 1,893
Registered: 10/14/08
Re: JMS and Database interactions under the same transactional context
Posted: Oct 18, 2011 6:27 AM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
Also check the Camel transactional guide
http://fusesource.com/products/enterprise-camel/#documentation

And as ActiveMQ also supports redelivery you should generally configure redelivery and dead letter channel in ActiveMQ and not Camel.
http://activemq.apache.org/message-redelivery-and-dlq-handling.html
http://activemq.apache.org/redelivery-policy.html

The Camel in Action book chapter 9 also convers transactions and using XA etc. The sample source code for the book is free, so you may find some inspiration there.
marks1900

Posts: 12
Registered: 09/01/11
Re: JMS and Database interactions under the same transactional context
Posted: Oct 20, 2011 10:23 PM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
Thanks once again for replying to these posts and I am very appreciative.

I had a close look at the Camel in Action Example I updated my Spring resource xml, and still was unable to get Transaction Rollback working as expected.

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
It seems that on rollback the ActiveMq message gets consumed and does NOT end up in the Dead Letter Queue.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

I have tried the default ActiveMq broker configuration that comes with ServiceMix 4.3 (etc/activemq-broker.xml) and also the broker configuration as it exists in the Camel in Action example (http://code.google.com/p/camelinaction/source/browse/trunk/chapter9/xa/src/test/resources/spring-context.xml).

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
My console logs definitely indicated that rollback has occurred.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
aultMessageListenerContainer-1 Pipeline DEBUG Message exchange has failed so breaking out of pipeline for exchange: ExchangeJmsMessage: null Exception: java.lang.IllegalArgumentException: Unexpected Exception
Atomikos:3 atomikos INFO XAResource.rollback ( 746D64657630303030313030323434:746D64657631 ) on resource ds1 represented by XAResource instance XAResourceID:2
aultMessageListenerContainer-1 atomikos INFO rollback() done of transaction tmdev0000100244
aultMessageListenerContainer-1 TransactionErrorHandler WARN Transaction rollback (0x4f88f506) for ExchangeId: ID:user-63311-1319148563241-0:5:1:1:1 due exception: java.lang.IllegalArgumentException: Unexpected Exception
aultMessageListenerContainer-1 EndpointMessageListener ERROR Caused by: org.apache.camel.RuntimeCamelException - java.lang.IllegalArgumentException: Unexpected Exception
org.apache.camel.RuntimeCamelException: java.lang.IllegalArgumentException: Unexpected Exception


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
My Test Case extends CamelSpringTestSupport, and the following is my route:
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new SpringRouteBuilder() {

@Override
public void configure() throws Exception {

from("activemq:queue:my_test_thirdparty")
.transacted()
.log("+ Before Database Call +")
.bean(ServiceMixXaRollbackTest.class, "toSql")
.to("jdbc:dataSource")
.log("+ After Database Call +")
.throwException(new RuntimeException("Unexpected Exception"))
;
}

};
}


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

"src/main/resources/META-INF/spring/resource-context.xml" file:

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0"
xmlns:broker="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://servicemix.apache.org/file/1.0 http://servicemix.apache.org/file/1.0/servicemix-file.xsd
http://servicemix.apache.org/cxfse/1.0 http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
<property name="forceShutdown" value="false" />
</bean>

<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="1000" />
</bean>

<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="atomikosTransactionManager" />
<property name="userTransaction" ref="atomikosUserTransaction" />
</bean>


<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>


<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jtaTransactionManager"/>
</bean>


<bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="amq1" />
<property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />
</bean>

<bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean">
<property name="uniqueResourceName" value="ds1" />
<property name="xaDataSource" ref="dataSourceRaw" />
<property name="testQuery" value="select 1" />
</bean>

<bean id="dataSourceRaw" class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource">
<property name="serverName" value="localhost" />
<property name="portNumber" value="1433" />
<property name="selectMethod" value="cursor" />
<property name="databaseName" value="databaseName" />
<property name="user" value="user" />
<property name="password" value="password" />
</bean>

</beans>

davsclaus

Posts: 1,893
Registered: 10/14/08
Re: JMS and Database interactions under the same transactional context
Posted: Oct 21, 2011 7:24 AM   in response to: marks1900 in response to: marks1900
 
  Click to reply to this thread Reply
Are you running inside Apache ServiceMix 4.3, or Fuse ESB? If so what version exactly are you using?

SMX/Fuse ESB already comes with TX manager support (from Aries/Geronimo). See for example the org.apache.aries.transaction.cfg in the etc folder of the SMX/ESB
marks1900

Posts: 12
Registered: 09/01/11
Re: JMS and Database interactions under the same transactional context
Posted: Oct 21, 2011 4:17 PM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
From your comments it seems I should change the way I am attempting to setup the transaction management of my Database (Microsoft SQL Server) and Message Broker (ActiveMq) to use the out-of-the-box TX manager support (from Aries/Geronimo).

While the following link highlights what you are saying:
http://fusesource.com/docs/esb/4.4.1/esb_deploy_osgi/ESBOSGiIntro-Txn.html

From my understanding for what I want, I am looking for an examples that uses an JMS XA Connection Factory class (ActiveMQXAConnectionFactory) and an XA Datasource class (SQLServerXADataSource) to complete this XA/JTA transaction.

I even found the source code that is for the upcoming Fuse Webinars "Database Integration with Apache Camel" talks, though am quite bewildered without the presentation talk and slides.

https://github.com/cmoulliard/sparks/tree/master/fuse-webinars/camel-persistence-part1

https://github.com/cmoulliard/sparks/tree/master/fuse-webinars/camel-persistence-part2

Is there any specific examples that you know of that would be able to help me along here?

marks1900

Posts: 12
Registered: 09/01/11
Re: JMS and Database interactions under the same transactional context
Posted: Oct 24, 2011 3:39 PM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
I did find a reference to ActiveMq configuration on combing "JMS and JDBC operations in one transaction with Spring/Jencks/ActiveMQ":

http://activemq.apache.org/jms-and-jdbc-operations-in-one-transaction.html

Though, I am still searching for the relevant configuration I to achieve ensure my ServiceMix 4.3 Camel routes can have transactional integrity.
marks1900

Posts: 12
Registered: 09/01/11
Re: JMS and Database interactions under the same transactional context
Posted: Oct 26, 2011 4:29 PM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
So I have discovered some issues with my code. The following code seems to be a working example with Atomikos (resource-context.xml) with Test Case (ServiceMixXaRollbackTest) using a ServiceMix 4.3 ActiveMq instance (activemq-broker.xml).


Quick point:

davsclaus wrote:
SMX/Fuse ESB already comes with TX manager support (from Aries/Geronimo). See for example the org.apache.aries.transaction.cfg in the etc folder of the SMX/ESB

++ Thanks for highlighting this, and I am working on a solution that uses this as well. Now back to the Atomikos example.

Quick point:
davsclaus wrote:
And as ActiveMQ also supports redelivery you should generally configure redelivery and dead letter channel in ActiveMQ and not Camel.
http://activemq.apache.org/message-redelivery-and-dlq-handling.html
http://activemq.apache.org/redelivery-policy.html

++ I will attempt to move this logic to ActiveMq.

Unfortunately, on transaction rollback (and retry exhaustion) I am expecting my ActiveMq message to move to the Dead Letter Queue "DLQ.my_test_thirdparty" as specified in the activemq-broker.xml, since I don't override any message redelivery and dlq handling behaviour in Camel. Instead I am finding that the ActiveMq message lands in the queue "ActiveMQ.DLQ".

What am I missing in my understanding on the behaviour of ActiveMq and Camel ?


Updated code below.

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Start code snippet: "src/main/resources/META-INF/spring/resource-context.xml"

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0"
xmlns:broker="http://activemq.apache.org/schema/core"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://servicemix.apache.org/file/1.0 http://servicemix.apache.org/file/1.0/servicemix-file.xsd
http://servicemix.apache.org/cxfse/1.0 http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

<!--
<tx:annotation-driven transaction-manager="jtaTransactionManager" />
-->

<!-- Camel -->

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jtaTransactionManager"/>
</bean>

<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jtaTransactionManager"/>
</bean>

<bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
<property name="dataSource" ref="dataSource"/>
</bean>

<bean id="jdbc" class="org.apache.camel.component.jdbc.JdbcComponent">
<property name="dataSource" ref="dataSource"/>
</bean>

<!-- Atomikos -->

<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" >
<property name="forceShutdown" value="false" />
</bean>

<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" >
<property name="transactionTimeout" value="150" />
</bean>

<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="atomikosTransactionManager" />
<property name="userTransaction" ref="atomikosUserTransaction" />
<property name="allowCustomIsolationLevels" value="true" />
</bean>

<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jtaTransactionManager" />
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
<property name="transactionTemplate" ref="transactionTemplate" />
</bean>

<bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager" ref="jtaTransactionManager" />
<property name="isolationLevelName" value="ISOLATION_SERIALIZABLE"/>
<!-- <property name="timeout" value="10"/> -->
</bean>

<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSource"/>
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="atomikosConnectionFactory" />
</bean>

<!-- JMS Connection Configuration -->

<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>

<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="amq1" />
<property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />
<property name="localTransactionMode" value ="false" />
<!-- <property name="maxPoolSize" value="10" /> -->
</bean>

<!-- Database Connection Configuration -->

<bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean">
<property name="uniqueResourceName" value="ds1" />
<property name="testQuery" value="select 1" />
<property name="xaDataSource">
<bean class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource">
<property name="serverName" value="localhost" />
<property name="portNumber" value="1433" />
<property name="selectMethod" value="cursor" />
<property name="databaseName" value="messaging" />
<property name="user" value="__user__" />
<property name="password" value="_password__" />
</bean>
</property>
</bean>

</beans>

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

End code snippet: "src/main/resources/META-INF/spring/resource-context.xml"

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Note 1 (Broke my transaction wrapping):

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jtaTransactionManager"/>
</bean>

<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jtaTransactionManager"/>
</bean>

+++++++++++++++++++ IS NOT EQUIVALENT TO +++++++++++++++++++

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="transactionManager" ref="jtaTransactionManager"/>
<property name="connectionFactory" ref="atomikosConnectionFactory"/>
<property name="transacted" value="true"/>
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig"/>
</bean>

<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig"/>
</bean>


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Note 2 (Broke my transaction wrapping):

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


<bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean">
<property name="uniqueResourceName" value="ds1" />
<property name="testQuery" value="select 1" />
<property name="xaDataSource">
<bean class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource">
<property name="serverName" value="localhost" />
<property name="portNumber" value="1433" />
<property name="selectMethod" value="cursor" />
<property name="databaseName" value="messaging" />
<property name="user" value="__user__" />
<property name="password" value="_password__" />
</bean>
</property>
</bean>

+++++++++++++++++++ IS NOT EQUIVALENT TO +++++++++++++++++++

<bean id="atomikosDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean">
<property name="uniqueResourceName" value="ds1" />
<property name="xaDataSource" ref="dataSource" />
<property name="testQuery" value="select 1" />
</bean>

<bean id="dataSource" class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource">
<property name="serverName" value="localhost" />
<property name="portNumber" value="1433" />
<property name="selectMethod" value="cursor" />
<property name="databaseName" value="messaging" />
<property name="user" value="__user__" />
<property name="password" value="_password__" />
</bean>

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Start code snippet: "src/test/java/xa/ServiceMixXaRollbackTest.java"

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

package test.xa;

import junit.framework.Assert;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.language.XPath;
import org.apache.camel.spring.SpringRouteBuilder;
import org.apache.camel.test.CamelSpringTestSupport;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;


public class ServiceMixXaRollbackTest extends CamelSpringTestSupport
{
protected JdbcTemplate jdbc;

@Override
@BeforeClass
public void setUp() throws Exception {
super.setUp();

jdbc = context.getRegistry().lookup("jdbcTemplate", JdbcTemplate.class);

try
{
jdbc.execute( "drop table messaging.my_test_thirdparty" );
}
catch ( Exception e )
{
// ignore
}
jdbc.execute("create table messaging.my_test_thirdparty ( thirdparty_id varchar(10), name varchar(128), created varchar(20), status_code varchar(3) )");
}

@Override
@AfterClass
public void tearDown() throws Exception {
jdbc.execute("drop table messaging.my_test_thirdparty");
}

@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
camelContext.addRoutes( createRouteBuilder() );
return camelContext;
}


@Override
protected AbstractXmlApplicationContext createApplicationContext() {
return new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/resource-context.xml"});
}

@Test
public void testXaRollbackAfterDb() throws Exception {

Assert.assertEquals( 0, jdbc.queryForInt("select count(*) from my_test_thirdparty"));

String xml = "<?xml version=\"1.0\"?><thirdparty id=\"123\"><name>THE TEST THIRDPARTY</name><date>201110140815</date><code>200</code></thirdparty>";
template.sendBody("activemq:queue:my_test_thirdparty", xml);

Thread.sleep(15000);

Assert.assertEquals( 0, jdbc.queryForInt("select count(*) from my_test_thirdparty"));

String dlq = consumer.receiveBodyNoWait("activemq:queue:DLQ.my_test_thirdparty", String.class);
Assert.assertNotNull("Should not lose message", dlq);
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new SpringRouteBuilder() {

@Override
public void configure() throws Exception {

from("activemq:queue:my_test_thirdparty")
.transacted()
.log("+ Before Database Call +")
.bean(ServiceMixXaRollbackTest.class, "toSql")
.to("jdbc:dataSource")
.log("+ After Database Call +")
.throwException(new IllegalArgumentException("Unexpected Exception"))
;

}

};
}

public static String toSql(@XPath("thirdparty/@id") int thirdpartyId,
@XPath("thirdparty/name/text()") String name,
@XPath("thirdparty/date/text()") long created,
@XPath("thirdparty/code/text()") int status_code) {

if (thirdpartyId <= 0) {
throw new IllegalArgumentException("ThirdPartyId is invalid, was " + thirdpartyId);
}

StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO messaging.my_test_thirdparty (thirdparty_id, name, created, status_code) VALUES (");
sb.append("'").append(thirdpartyId).append("', ");
sb.append("'").append(name).append("', ");
sb.append("'").append(created).append("', ");
sb.append("'").append(status_code).append("') ");

return sb.toString();
}

}

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

End code snippet: "src/test/java/xa/ServiceMixXaRollbackTest.java"

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Start code snippet: "apache-servicemix-4.3.0/etc/activemq-broker.xml"

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0"
xmlns:ext="http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.0.0"
xmlns:amq="http://activemq.apache.org/schema/core">

<ext:property-placeholder />

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="default" dataDirectory="${karaf.data}/activemq/default" useShutdownHook="false">

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="10mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb">

<deadLetterStrategy>
<!--
Use the prefix 'DLQ.' for the destination name, and make
the DLQ a queue rather than a topic
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

<managementContext>
<managementContext createConnector="false"/>
</managementContext>

<persistenceAdapter>
<kahaDB directory="${karaf.data}/activemq/default/kahadb"/>
</persistenceAdapter>

<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
</transportConnectors>

</broker>

<bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">

<property name="brokerURL" value="tcp://localhost:61616" />
</bean>

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="maxConnections" value="8" />
<property name="connectionFactory" ref="activemqConnectionFactory" />
</bean>

<bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
<property name="transactionManager" ref="transactionManager" />
<property name="connectionFactory" ref="activemqConnectionFactory" />
<property name="resourceName" value="activemq.default" />
</bean>

<reference id="transactionManager" interface="javax.transaction.TransactionManager" />

<service ref="pooledConnectionFactory" interface="javax.jms.ConnectionFactory">
<service-properties>
<entry key="name" value="localhost"/>
</service-properties>
</service>

</blueprint>

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

End code snippet: "apache-servicemix-4.3.0/etc/activemq-broker.xml"

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

marks1900

Posts: 12
Registered: 09/01/11
Re: JMS and Database interactions under the same transactional context
Posted: Oct 26, 2011 7:45 PM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
davsclaus wrote:
Also check the Camel transactional guide
http://fusesource.com/products/enterprise-camel/#documentation

And as ActiveMQ also supports redelivery you should generally configure redelivery and dead letter channel in ActiveMQ and not Camel.
http://activemq.apache.org/message-redelivery-and-dlq-handling.html
http://activemq.apache.org/redelivery-policy.html

The Camel in Action book chapter 9 also convers transactions and using XA etc. The sample source code for the book is free, so you may find some inspiration there.

Thank you very much for your help. It did take a while to understand this, though your help has been pinpoint accurate.

++++++++++++++++++++++++++++++++++++++++++++++++++++++

Turns out that I didn't reload the ServiceMix ActiveMq bundle via the osgi:update command. This meant that the following text that I added was being ignored.

<deadLetterStrategy>
<!--
Use the prefix 'DLQ.' for the destination name, and make
the DLQ a queue rather than a topic
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>

+ Thanks for the link: http://activemq.apache.org/message-redelivery-and-dlq-handling.html

++++++++++++++++++++++++++++++++++++++++++++++++++++++

I also updated my Redelivery Policy to be specific about jms behaviour, as it appears this configuration has to be done client side:

<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="redeliveryPolicy">
<bean class="org.apache.activemq.RedeliveryPolicy">
<property name="initialRedeliveryDelay" value="1000"/>
<property name="backOffMultiplier" value="5"/>
<property name="useExponentialBackOff" value="true"/>
<property name="maximumRedeliveries" value="6"/>
</bean>
</property>
</bean>

+ Thanks for the link: http://activemq.apache.org/redelivery-policy.html


++++++++++++++++++++++++++++++++++++++++++++++++++++++

Once again, thank you!

++++++++++++++++++++++++++++++++++++++++++++++++++++++

On an unrelated issue, I made a mistake with one of my above beans, when I forgot the sessionTransacted property. It should read.

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="atomikosConnectionFactory" />
<property name="sessionTransacted" value="true" />
</bean>

Edited by: marks1900 on Oct 26, 2011 7:45 PM

davsclaus

Posts: 1,893
Registered: 10/14/08
Re: JMS and Database interactions under the same transactional context
Posted: Oct 28, 2011 8:42 AM   in response to: davsclaus in response to: davsclaus
 
  Click to reply to this thread Reply
Using AMQ redelivery has the benefit that the messages is already persistent in the broker.

So in case of server crash, server stopped during redelivery etc. Then when restarting the broker, the messages is not lost, and it can simply continue from where it left.

If you use Camel to do the redelivery, then the messages is in memory only. So in case of a server crash, that message is lost.

Camel redelivery makes sense to use when you do not use AMQ.
marks1900

Posts: 12
Registered: 09/01/11
Re: JMS and Database interactions under the same transactional context
Posted: Oct 26, 2011 7:47 PM   in response to: marks1900 in response to: marks1900
 
  Click to reply to this thread Reply
I have managed to get JMS and Database interactions under the same transactional context using ActiveMq for JMS and Microsoft SQL Server as the Database.
davsclaus

Posts: 1,893
Registered: 10/14/08
Re: JMS and Database interactions under the same transactional context
Posted: Oct 27, 2011 6:18 AM   in response to: marks1900 in response to: marks1900
 
  Click to reply to this thread Reply
Glad you got it working. Yeah TX is not easy.
marks1900

Posts: 12
Registered: 09/01/11
Re: JMS and Database interactions under the same transactional context
Posted: Feb 2, 2012 5:10 PM   in response to: marks1900 in response to: marks1900
 
  Click to reply to this thread Reply
I just released some source code as a working example, that others might find helpful.

== ServiceMix OSGi Aries Blueprint Atomikos Beans ==

https://bitbucket.org/mark1900/test-osgi/src/ff21eaa32e22/test-osgi-blueprint-xa-atomikos/src/main/resources/OSGI-INF/blueprint/resource-context.xml

== Atomikos + Camel Integration to achieve XA Transactions ==

My Camel configuration using these Atomikos beans is here:

https://bitbucket.org/mark1900/test-osgi/src/ff21eaa32e22/test-osgi-blueprint-xa-atomikos/src/main/resources/OSGI-INF/blueprint/xa-test-camel-context.xml

== Atomikos + Pojo Service Integration to achieve XA Transactions ==

My Service configuration using these Atomikos beans is here:

https://bitbucket.org/mark1900/test-osgi/src/ff21eaa32e22/test-osgi-blueprint-xa-atomikos/src/main/resources/OSGI-INF/blueprint/xa-test-service-context.xml

As I am deploying to the ServiceMix 4.4.x environment and as such I had to use do my manual transaction management, which is given here:

https://bitbucket.org/mark1900/test-osgi/src/ff21eaa32e22/test-osgi-blueprint-xa-atomikos/src/main/java/xo/transaction/XoTransactionProxyHandler.java

davsclaus

Posts: 1,893
Registered: 10/14/08
Re: JMS and Database interactions under the same transactional context
Posted: Feb 2, 2012 6:04 PM   in response to: marks1900 in response to: marks1900
 
  Click to reply to this thread Reply
Thanks for sharing this with us.