An application can receive a JMS message by creating a consumer from a destination. In the JMS 2.0 simplified API, an application can create a javax.jms.JMSConsumer
object from the JMSContext
. The method createConsumer()
generates the consumer instance.
A JMS 1.1. application explicitly creates a javax.jms.MessageConsumer
object from a connection factory and session. With this message destination object, it reads the messages from the destination.The traditional code looks similar to this example:
// JMS 1.1 Connection conx = queueConnectionFactory.createConnection(); Session session = conx.createSession(true, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(queue) TextMessage textMessage = (TextMessage)messageConsumer.receive(1000); System.out.printf("Message received was %s ", textMessage.getText() );
In JMS 2.0, writing code with JMSConsumer
is preferred. Here is the rewritten example using this object instance:
// JMS 2.0 JMSContext context = /* Injected */ JMSConsumer consumer = context.createConsumer(queue) String text = consumer.receiveBody( String.class, 1000 );
With the receiveBody()
method on the JMSConsumer
instance, there is no need to cast the receive message to a TextMessage
explicitly to get the body of the message, because the call will return the type of object required directly. If there is an error on conversion, then the JMS provider will raise a javax.jms.JmsRuntimeException
exception.
Synchronous reception of messages occurs when the JMS client makes an invocation to read a message on the channel and the JMS provider does not return the thread of control to the sender, until it has a message on the channel that can be retrieved. This is an example of blocking-and-waiting.
Here is a condensed definition of the JMSConsumer
interface:
package javax.jms; public interface JMSConsumer extends AutoCloseable { String getMessageSelector(); MessageListener getMessageListener() throws JMSRuntimeException; void setMessageListener(MessageListener listener) throws JMSRuntimeException; Message receive(); Message receive(long timeout); Message receiveNoWait(); void close(); <T> T receiveBody(Class<T> c); <T> T receiveBody(Class<T> c, long timeout); <T> T receiveBodyNoWait(Class<T> c); }
The methods receive()
, receive( long timeout)
, or receiveNoWait()
allow a client application to request the next message on the channel, which will be an abstract Message
object. Unfortunately, the developer has to cast the object to the correct type.
The two receiveBody()
methods return the payload of the message and have better affordance because they allow the JMS provider to cast the object to the request object, which is especially useful if the client is not interested in the header of the JMS message. However, if the application requires the header information such as the correlation ID of the message, these calls are not appropriate. If the message is not a supported type, or the payload of the message is not assignable to the supplied class type argument, then the provider raises a javax.jms.MessageFormatRuntimeException
exception.
These receive*()
methods operate synchronously and will block-and-wait until a message is put on the channel if it is empty at the time of the call.
The close()
method will terminate the delivery of messages to the consumer. The close()
method is the only method on the consumer which can be invoked from a separate thread, which is useful for managed concurrency services in a Java EE environment. Closing a consumer will block the calling thread if the JMS Consumer is in the middle of at least one receive()
call.
In order to receive JMS messages asynchronously, the application registers javax.jms.MessageListener
on the JMSConsumer
object instance.
The Java interface for the message listener is really straightforward:
public interface MessageListener { void onMessage(Message message); }
It is considered bad practice for a MessageListener
to throw a RuntimeException
. Instead, application code should trap fatal errors and log them somewhere or push these failures to a central monitoring server. During the asynchronous delivery of a message, a RuntimeException
could cause the JMS provider to continuously redeliver a message forever, especially if the session is set to AUTO_ACKNOWLEDGE
or DUPS_OK_ACKNOWLEDGE
.
There is an exception to the rule where the MessageListener
is part of the session that is set to TRANSACTED
, so the session will be committed or rolled back. However, even then the application can explicitly perform the duty and also log the failure for the business of maintenance and operational support. Let's change gears and move from publication to consumption of messages. There are two fundamental ways to consume methods: shareable and non-shareable. These two categories can be further divided into durable or non-durable.
In the simplified JMS 2.0, API there is no distinction in the way a developer creates a consumer reading from a queue from that of topic destination. The various overloaded methods, createConsumer()
on JMSContext
, will create JMSConsumer
that is not sharable and not durable.
Every call to createConsumer()
(and in JMS 1.1, createSubscriber()
on the traditional javax.jms.TopicSession
) will create a new non-shareable and non-durable subscription without a name. The subscriber will live for the duration of the JMSConsumer
object.
The createConsumer()
method always takes a Destination
, which can be a Queue
or Topic
. Remember there is no distinction in the simplified API. The messageSelector
argument in the overloaded variants specifies a filter for messages in the topic channel. If messageSelector
is set to null, then all messages are received.
These are the API definitions:
JMSConsumer createDurableConsumer(Topic topic, String name ); JMSConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal); JMSConsumer createSharedDurableConsumer(Topic topic, String name); JMSConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector);
The third variant of createConsumer()
accepts a noLocal
Boolean parameter that specifies that messages published to a topic by their own connection must not be added to the subscription. This flag is designed for JMS consumers that publish new messages to the same connection. Set this flag to true
if you are executing this behavior.
Once the JMSConsumer
object is closed, the subscribers are terminated.
JMS 2.0 introduces consumers that can be shared between different Java threads. This feature is particularly aimed at JMS clients running in Java SE environments and fixes a known issue with subscriptions on topic destinations. Messages from a topic are consumed from a subscription, and a subscription receives every message sent to the topic.
The issue with JMS 1.1 was that only one subscription on a topic could have only one consumer at a time. This was a limitation that severely limited the scalability of subscription: an application could not share the subscription topic between two Java threads for processing, or multiple JVM, or even multiple machines.
In JMS 2.0, there is a new type of topic-only subscription called shared subscriptions, and they can be durable or non-durable. Shared-subscriptions can have any number of consumers either in the same JVM or between two or more JVMs or multiple server machines. This makes shared subscriptions eminently scalable for those environments that prefer to only rely on Java SE and a JMS provider.
To create a sharable consumer, there are various createSharedDestination()
methods on the JMSContext
class in JMS 2.0.
The shareable consumer API is rather plain:
JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName); JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector);
Here, the developer must supply a Topic
destination and, obviously, the common name of the subscription channel between the threads, JVMs, or machines.
The durable consumers on a topic are extremely useful in e-commerce applications where every subscriber reliably receives all the messages in the channel. This translates to not losing a customer's orders in a large warehouse enterprise or a new or amended trade coming into the bank's straight-through-processing system.
The standard defines a durable subscription as the mode of communication where an application needs to receive all the messages published on a topic, including the messages published when there is no consumer associated with it. The JMS provider has the responsibility to retain a record of the durable subscription and to deliver messages from the topic channel to the consumer. The provider ensures messages are saved until they are delivered and acknowledged by a durable consumer. If a JMS message expires, then the provider is allowed to purge the message from the topic or save it somewhere else for operational support, if any.
In the simplified JMS 2.0, API durable consumers can be created with a variety of overloaded methods (createDurableConsumer()
and createSharedDurableConsumer())
on the JMSContext
instance. Durable consumers can be shared or non-shared on a connection.
The API for creating durable subscriptions looks like this:
JMSConsumer createDurableConsumer(Topic topic, String name ); JMSConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal); JMSConsumer createSharedDurableConsumer(Topic topic, String name); JMSConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector);
The durable consumer requires a subscription channel name in which the application must be specified. The name is an identifier of the client.
The JMS provider persists the durable subscriptions and therefore this fact immediately creates a trade-off between throughput and storage space. Messages will accumulate in the channel until they are deleted because the application calls the unsubscribe()
method on the JMSContext
.
So, inside your order processing subscriber for the warehouse enterprise, you are required to unsubscribe for a durable subscription.
In a Java SE environment, in order to receive messages from the JMS, the actual JMSContext
must be started. This is of less importance when an application runs inside an EJB container or a server environment.
For a standalone application, it is important to invoke the JMSContext.start()
method to set off the connection for the publication of messages and also to receive messages.
Conversely, calling the JMSContext.stop()
methods begins the process of terminating the JMS connection to the provider. This call will block, of course, if there are active threads actually pushing messages or delivery messages in the provider's implementation.
A Java EE application with the inject JMS connection is prohibited from calling these methods. See the section on CDI injection for further information.
Since JMS 2.0, providers are now required to provide a new JMS message header property called JMSXDeliveryCount
. This value is set to the number of delivery attempts, and it now allows an application to portably know across vendors when a message has been resent.
The delivery count helps in situations where an incoming message such as an XML message has a syntactical or semantic failure which causes the processing to fail. The consumer throws a runtime exception and the stack frame unwinds all the way down to the JMS providers, which then thinks the message has failed to be delivered and therefore connects to the channel again for redelivery. This situation is essentially an infinite loop. Before JMS 2.0, there was no standard means to know how to handle repeated failed messages. With the new mandatory property JMSXDeliveryCount
, an application can take evasive action.
Given a Message
instance, call getIntProperty("JMSXDeliveryCount")
to get the delivery count. A value of two or more means the message has been redelivered to the consumer.
Incidentally, there are other optional standard JMS defined properties that can be useful. Here are some of them:
This concludes the section on publishing messages to JMS and receiving messages. Let us move on to message-driven beans.
3.145.107.100