LibraryLink ToToggle FramesPrintFeedback

Chapter 33. Hazelcast Component

Available as of Camel 2.7

The hazelcast: component allows you to work with the Hazelcast distributed data grid / cache. Hazelcast is a in memory data grid, entirely written in Java (single jar). It offers a great palette of different data stores like map, multi map (same key, n values), queue, list and atomic number. The main reason to use Hazelcast is its simple cluster support. If you have enabled multicast on your network you can run a cluster with hundred nodes with no extra configuration. Hazelcast can simply configured to add additional features like n copies between nodes (default is 1), cache persistence, network configuration (if needed), near cache, enviction and so on. For more information consult the Hazelcast documentation on http://www.hazelcast.com/documentation.jsp .

Maven users will need to add the following dependency to their pom.xml for this component:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-hazelcast</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

hazelcast:[ map | multimap | queue | seda | set | atomicvalue | instance]:cachename[?options]

[Warning]Warning

You have to use the second prefix to define which type of data store you want to use.

If you want to store a value in a map you can use the map cache producer. The map cache producer provides 5 operations (put, get, update, delete, query). For the first 4 you have to provide the operation inside the "hazelcast.operation.type" header variable. In Java DSL you can use the constants from org.apache.camel.component.hazelcast.HazelcastConstants.

Header Variables for the request message:

Name Type Description
hazelcast.operation.type String valid values are: put, delete, get, update, query
hazelcast.objectId String the object id to store / find your object inside the cache (not needed for the query operation)

You can call the samples with:

template.sendBodyAndHeader("direct:[put|get|update|delete|query]", "my-foo", HazelcastConstants.OBJECT_ID, "4711");

Java DSL:

from("direct:put")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.PUT_OPERATION))	
.toF("hazelcast:%sfoo", HazelcastConstants.MAP_PREFIX);

Spring DSL:

<route>
	<from uri="direct:put" />
	<setHeader headerName="hazelcast.operation.type">
		<constant>put</constant>
	</setHeader>
	<to uri="hazelcast:map:foo" />
</route>

Java DSL:

from("direct:get")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.GET_OPERATION))	
.toF("hazelcast:%sfoo", HazelcastConstants.MAP_PREFIX)
.to("seda:out");

Spring DSL:

<route>
	<from uri="direct:get" />
	<setHeader headerName="hazelcast.operation.type">
		<constant>get</constant>
	</setHeader>
	<to uri="hazelcast:map:foo" />
	<to uri="seda:out" />
</route>

Java DSL:

from("direct:update")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.UPDATE_OPERATION))
.toF("hazelcast:%sfoo", HazelcastConstants.MAP_PREFIX);

Spring DSL:

<route>
	<from uri="direct:update" />
	<setHeader headerName="hazelcast.operation.type">
		<constant>update</constant>
	</setHeader>
	<to uri="hazelcast:map:foo" />
</route>

Java DSL:

from("direct:delete")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.DELETE_OPERATION))
.toF("hazelcast:%sfoo", HazelcastConstants.MAP_PREFIX);

Spring DSL:

<route>
	<from uri="direct:delete" />
	<setHeader headerName="hazelcast.operation.type">
		<constant>delete</constant>
	</setHeader>
	<to uri="hazelcast:map:foo" />
</route>

Java DSL:

from("direct:query")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.QUERY_OPERATION))
.toF("hazelcast:%sfoo", HazelcastConstants.MAP_PREFIX)
.to("seda:out");

Spring DSL:

<route>
	<from uri="direct:query" />
	<setHeader headerName="hazelcast.operation.type">
		<constant>query</constant>
	</setHeader>
	<to uri="hazelcast:map:foo" />
	<to uri="seda:out" />
</route>

For the query operation Hazelcast offers a SQL like syntax to query your distributed map.

String q1 = "bar > 1000";
template.sendBodyAndHeader("direct:query", null, HazelcastConstants.QUERY, q1);

Hazelcast provides event listeners on their data grid. If you want to be notified if a cache will be manipulated, you can use the map consumer. There're 4 events: put, update, delete and envict. The event type will be stored in the "hazelcast.listener.action" header variable. The map consumer provides some additional information inside these variables:

Header Variables inside the response message:

Name Type Description
hazelcast.listener.time Long time of the event in millis
hazelcast.listener.type String the map consumer sets here "cachelistener"
hazelcast.listener.action String type of event - here added, updated, envicted and removed
hazelcast.objectId String the oid of the object
hazelcast.cache.name String the name of the cache - e.g. "foo"
hazelcast.cache.type String the type of the cache - here map

The object value will be stored within put and update actions inside the message body.

Here's a sample:

fromF("hazelcast:%sfoo", HazelcastConstants.MAP_PREFIX)
.log("object...")
.choice()
    .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.ADDED))
         .log("...added")
         .to("mock:added")
    .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.ENVICTED))
         .log("...envicted")
         .to("mock:envicted")
    .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.UPDATED))
         .log("...updated")
         .to("mock:updated")
    .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.REMOVED))
         .log("...removed")
         .to("mock:removed")
    .otherwise()
         .log("fail!");

A multimap is a cache where you can store n values to one key. The multimap producer provides 4 operations (put, get, removevalue, delete).

Header Variables for the request message:

Name Type Description
hazelcast.operation.type String valid values are: put, get, removevalue, delete
hazelcast.objectId String the object id to store / find your object inside the cache

Java DSL:

from("direct:put")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.PUT_OPERATION))
.to(String.format("hazelcast:%sbar", HazelcastConstants.MULTIMAP_PREFIX));

Spring DSL:

<route>
	<from uri="direct:put" />
	<log message="put.."/>
	<setHeader headerName="hazelcast.operation.type">
		<constant>put</constant>
	</setHeader>
	<to uri="hazelcast:multimap:foo" />
</route>

Java DSL:

from("direct:removevalue")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.REMOVEVALUE_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.MULTIMAP_PREFIX);

Spring DSL:

<route>
	<from uri="direct:removevalue" />
	<log message="removevalue..."/>
	<setHeader headerName="hazelcast.operation.type">
		<constant>removevalue</constant>
	</setHeader>
	<to uri="hazelcast:multimap:foo" />
</route>

To remove a value you have to provide the value you want to remove inside the message body. If you have a multimap object } you have to put "my-foo" inside the message body to remove the "my-foo" value.

Java DSL:

from("direct:get")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.GET_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.MULTIMAP_PREFIX)
.to("seda:out");

Spring DSL:

<route>
	<from uri="direct:get" />
	<log message="get.."/>
	<setHeader headerName="hazelcast.operation.type">
		<constant>get</constant>
	</setHeader>
	<to uri="hazelcast:multimap:foo" />
	<to uri="seda:out" />
</route>

Java DSL:

from("direct:delete")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.DELETE_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.MULTIMAP_PREFIX); 

Spring DSL:

<route>
	<from uri="direct:delete" />
	<log message="delete.."/>
	<setHeader headerName="hazelcast.operation.type">
		<constant>delete</constant>
	</setHeader>
	<to uri="hazelcast:multimap:foo" />
</route>

you can call them in your test class with:

template.sendBodyAndHeader("direct:[put|get|removevalue|delete]", "my-foo", HazelcastConstants.OBJECT_ID, "4711");

For the multimap cache this component provides the same listeners / variables as for the map cache consumer (except the update and enviction listener). The only difference is the multimap prefix inside the URI. Here is a sample:

fromF("hazelcast:%sbar", HazelcastConstants.MULTIMAP_PREFIX)
.log("object...")
.choice()
	.when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.ADDED))
		.log("...added")
                .to("mock:added")
        //.when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.ENVICTED))
        //        .log("...envicted")
        //        .to("mock:envicted")
        .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.REMOVED))
                .log("...removed")
                .to("mock:removed")
        .otherwise()
                .log("fail!");

Header Variables inside the response message:

Name Type Description
hazelcast.listener.time Long time of the event in millis
hazelcast.listener.type String the map consumer sets here "cachelistener"
hazelcast.listener.action String type of event - here added and removed (and soon envicted)
hazelcast.objectId String the oid of the object
hazelcast.cache.name String the name of the cache - e.g. "foo"
hazelcast.cache.type String the type of the cache - here multimap

Enviction will be added as feature, soon (this is a Hazelcast issue).

The queue producer provides 6 operations (add, put, poll, peek, offer, removevalue).

from("direct:add")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.ADD_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.QUEUE_PREFIX);

from("direct:put")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.PUT_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.QUEUE_PREFIX);

from("direct:poll")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.POLL_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.QUEUE_PREFIX);

from("direct:peek")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.PEEK_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.QUEUE_PREFIX);

from("direct:offer")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.OFFER_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.QUEUE_PREFIX);

from("direct:removevalue")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.REMOVEVALUE_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.QUEUE_PREFIX);

The queue consumer provides 2 operations (add, remove).

fromF("hazelcast:%smm", HazelcastConstants.QUEUE_PREFIX)
   .log("object...")
   .choice()
	.when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.ADDED))
        	.log("...added")
		.to("mock:added")
	.when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.REMOVED))
		.log("...removed")
		.to("mock:removed")
	.otherwise()
		.log("fail!");

The list producer provides 4 operations (add, set, get, removevalue).

from("direct:add")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.ADD_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.LIST_PREFIX);

from("direct:get")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.GET_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.LIST_PREFIX)
.to("seda:out");

from("direct:set")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.SETVALUE_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.LIST_PREFIX);

from("direct:removevalue")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.REMOVEVALUE_OPERATION))
.toF("hazelcast:%sbar", HazelcastConstants.LIST_PREFIX);

[Warning]Warning

Please note that set,get and removevalue and not yet supported by hazelcast, will be added in the future..

The list consumer provides 2 operations (add, remove).

fromF("hazelcast:%smm", HazelcastConstants.LIST_PREFIX)
	.log("object...")
	.choice()
		.when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.ADDED))
			.log("...added")
                        .to("mock:added")
		.when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.REMOVED))
			.log("...removed")
                        .to("mock:removed")
                .otherwise()
                        .log("fail!");

SEDA component differs from the rest components provided. It implements a work-queue in order to support asynchronous SEDA architectures, similar to the core "SEDA" component.

The SEDA producer provides no operations. You only send data to the specified queue.

Java DSL :

from("direct:foo")
.to("hazelcast:seda:foo");

Spring DSL :

<route>
   <from uri="direct:start" />
   <to uri="hazelcast:seda:foo" />
</route>

The SEDA consumer provides no operations. You only retrieve data from the specified queue.

Java DSL :

from("hazelcast:seda:foo")
.to("mock:result");

Spring DSL:

<route>
  <from uri="hazelcast:seda:foo" />
  <to uri="mock:result" />
</route>

[Warning]Warning

There is no consumer for this endpoint!

An atomic number is an object that simply provides a grid wide number (long). The operations for this producer are setvalue (set the number with a given value), get, increase (+1), decrease (-1) and destroy.

Header Variables for the request message:

Name Type Description
hazelcast.operation.type String valid values are: setvalue, get, increase, decrease, destroy

Java DSL:

from("direct:set")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.SETVALUE_OPERATION))
.toF("hazelcast:%sfoo", HazelcastConstants.ATOMICNUMBER_PREFIX);

Spring DSL:

<route>
	<from uri="direct:set" />
	<setHeader headerName="hazelcast.operation.type">
		<constant>setvalue</constant>
	</setHeader>
	<to uri="hazelcast:atomicvalue:foo" />
</route>

Provide the value to set inside the message body (here the value is 10): template.sendBody("direct:set", 10);

Java DSL:

from("direct:get")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.GET_OPERATION))
.toF("hazelcast:%sfoo", HazelcastConstants.ATOMICNUMBER_PREFIX);

Spring DSL:

<route>
	<from uri="direct:get" />
	<setHeader headerName="hazelcast.operation.type">
		<constant>get</constant>
	</setHeader>
	<to uri="hazelcast:atomicvalue:foo" />
</route>

You can get the number with long body = template.requestBody("direct:get", null, Long.class);.

Java DSL:

from("direct:increment")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.INCREMENT_OPERATION))
.toF("hazelcast:%sfoo", HazelcastConstants.ATOMICNUMBER_PREFIX); 

Spring DSL:

<route>
	<from uri="direct:increment" />
	<setHeader headerName="hazelcast.operation.type">
		<constant>increment</constant>
	</setHeader>
	<to uri="hazelcast:atomicvalue:foo" />
</route>

The actual value (after increment) will be provided inside the message body.

Java DSL:

from("direct:decrement")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.DECREMENT_OPERATION))
.toF("hazelcast:%sfoo", HazelcastConstants.ATOMICNUMBER_PREFIX);

Spring DSL:

<route>
	<from uri="direct:decrement" />
	<setHeader headerName="hazelcast.operation.type">
		<constant>decrement</constant>
	</setHeader>
	<to uri="hazelcast:atomicvalue:foo" />
</route>

The actual value (after decrement) will be provided inside the message body.

[Warning]Warning

There's a bug inside Hazelcast. So this feature may not work properly. Will be fixed in 1.9.3.

Java DSL:

from("direct:destroy")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.DESTROY_OPERATION))
.toF("hazelcast:%sfoo", HazelcastConstants.ATOMICNUMBER_PREFIX);

Spring DSL:

<route>
	<from uri="direct:destroy" />
	<setHeader headerName="hazelcast.operation.type">
		<constant>destroy</constant>
	</setHeader>
	<to uri="hazelcast:atomicvalue:foo" />
</route>

[Warning]Warning

This endpoint provides no producer!

Hazelcast makes sense in one single "server node", but it's extremly powerful in a clustered environment. The instance consumer fires if a new cache instance will join or leave the cluster.

Here's a sample:

fromF("hazelcast:%sfoo", HazelcastConstants.INSTANCE_PREFIX)
.log("instance...")
.choice()
	.when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.ADDED))
		.log("...added")
		.to("mock:added")
	.otherwise()
		.log("...removed")
		.to("mock:removed");

Each event provides the following information inside the message header:

Header Variables inside the response message:

Name Type Description
hazelcast.listener.time Long time of the event in millis
hazelcast.listener.type String the map consumer sets here "instancelistener"
hazelcast.listener.action String type of event - here added or removed
hazelcast.instance.host String host name of the instance
hazelcast.instance.port Integer port number of the instance